This notebook seeks to explore the gender diversity of the different apache projects & the process


In [1]:
# Kind of a hack because of the Spark notebook serialization issues
!rm lazy_helpers.py*
!wget https://raw.githubusercontent.com/holdenk/diversity-analytics/master/lazy_helpers.py


--2018-08-22 22:21:02--  https://raw.githubusercontent.com/holdenk/diversity-analytics/master/lazy_helpers.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1397 (1.4K) [text/plain]
Saving to: ‘lazy_helpers.py’

lazy_helpers.py     100%[===================>]   1.36K  --.-KB/s    in 0s      

2018-08-22 22:21:03 (45.8 MB/s) - ‘lazy_helpers.py’ saved [1397/1397]


In [2]:
#Hack to update sparklingml on a running cluster -
#TODO(holden): release sparkling ml properly so no hacks
memory_status_count = sc._jsc.sc().getExecutorMemoryStatus().size()
estimated_executors = max(sc.defaultParallelism, memory_status_count)
rdd = sc.parallelize(range(estimated_executors))
def do_update(x):
    import os
    return str(os.popen("whoami && cd /sparklingml && git pull && git log -n 5").read())
result = rdd.map(do_update)
result.collect()


Out[2]:
['yarn\n',
 'yarn\n',
 'yarn\n',
 'yarn\n',
 'yarn\n',
 'yarn\n',
 'yarn\n',
 'yarn\n']

In [ ]:


In [ ]:


In [3]:
import os
os.environ['PATH'] = os.environ['PATH'] + ":/usr/lib/chromium/"

In [4]:
from pyspark import *
from pyspark.sql import *
from pyspark.sql.functions import concat, collect_set, explode, from_json, format_string
from pyspark.sql import functions as F
from pyspark.sql.session import *
from pyspark.sql.types import *

import json
import os
import meetup.api
from copy import copy
import time
import logging
import subprocess

API key configuration


In [ ]:


In [5]:
# Load gh_api_token & meetup_key & genderize_key
exec(open("./secrets.literal").read())
gh_user = "holdenk"
fs_prefix = "gs://boo-stuff/"

In [ ]:

Less secret configuration


In [6]:
max_meetup_events = 800

In [7]:
SparkSession.builder.getOrCreate().stop()

In [8]:
session = (SparkSession.builder
           .appName("whatCanWeLearnFromTheSixties")
           .config("spark.executor.instances", "45")
           .config("spark.driver.memoryOverhead", "0.25")
           .config("spark.executor.memory", "16g")
           .config("spark.dynamicAllocation.enabled", "false")
           .config("spark.ui.enabled", "true")
          ).getOrCreate()
sc = session.sparkContext

In [9]:
# In _theory_ in preview dataproc Spark UI is force disabled but history fills the gap, except history server isn't started by default :(
sc.getConf().get("spark.yarn.historyServer.address")


Out[9]:
'holden-magic-m:18080'

The first thing we want to get is the committers and PMC members, this information is stored in LDAP but also available in JSON. Eventually we will want to enrich this with mailing list information


In [10]:
def loadFlatJsonFile(path, explodeKey, schema=None):
    """Load a flat multi-line json file and convert into Spark & explode"""
    rdd = sc.wholeTextFiles(path).values().setName("Input file {}".format(path))
    df = (session.read.schema(schema)
            .json(rdd))
    return df.select(explode(explodeKey))

In [11]:
apache_people_schema = StructType([StructField("lastCreateTimestamp", StringType()),
                     StructField("people",
                                 MapType(StringType(), 
                                         StructType([StructField('name', StringType()),
                                                     StructField('key_fingerprints', ArrayType(StringType())),
                                                     StructField('urls', ArrayType(StringType())),
                                                    ]))
                                )])
apache_poeple_df_file = "{0}{1}".format(fs_prefix, "http_data_sources/public_ldap_people.json") # http://people.apache.org/public/public_ldap_people.json
apache_people_df = loadFlatJsonFile(path=apache_poeple_df_file, 
                                 explodeKey="people", schema=apache_people_schema)
apache_people_df = apache_people_df.select(apache_people_df.key.alias("username"), apache_people_df.value.alias("extra")).repartition(100).persist().alias("apache_people")
apache_people_df.alias("Apache Committers")


Out[11]:
DataFrame[username: string, extra: struct<name:string,key_fingerprints:array<string>,urls:array<string>>]

In [12]:
sc.addFile("lazy_helpers.py")

In [13]:
# Construct a lazy urllib3 pool
from lazy_helpers import *
    
bcast_pool = sc.broadcast(LazyPool)
bcast_pool.value


Out[13]:
lazy_helpers.LazyPool

In [14]:
def project_on_github(project):
    """Returns if a project is on github"""
    import urllib3
    http = bcast_pool.value.get()
    r = http.request('GET', "https://github.com/apache/{0}".format(project))
    return r.status == 200
session.catalog.registerFunction("on_github", project_on_github, BooleanType())
# Except I'm a bad person so....
from pyspark.sql.catalog import UserDefinedFunction
project_on_github_udf = UserDefinedFunction(project_on_github, BooleanType(), "on_github")
session.catalog._jsparkSession.udf().registerPython("on_github", project_on_github_udf._judf)

In [15]:
apache_committees_schema = StructType([StructField("lastCreateTimestamp", StringType()),
                     StructField("committees",
                                 MapType(StringType(), StructType([StructField('roster', ArrayType(StringType())),
                                                                  StructField('modifyTimestamp', StringType()),
                                                                  StructField('createTimestamp', StringType())
                                                                  ])))])
apache_committees_df_file = "{0}{1}".format(fs_prefix, "http_data_sources/public_ldap_committees.json") # http://people.apache.org/public/public_ldap_committees.json
apache_committees_df = loadFlatJsonFile(path=apache_committees_df_file,
                                 explodeKey="committees", schema=apache_committees_schema)
apache_committees_on_github_df = apache_committees_df.filter(project_on_github_udf(apache_committees_df.key))
apache_committees_on_github_df.persist(StorageLevel.MEMORY_AND_DISK)
committee_names_df = apache_committees_on_github_df.select(apache_committees_df.key.alias("project")).alias("apache_committees").repartition(200)
committee_names_df.persist(StorageLevel.MEMORY_AND_DISK)
committee_names_df.alias("Apache Committee Names")
committee_names_df.count()


Out[15]:
163

In [16]:
project_to_user_df = apache_committees_on_github_df.select(
    apache_committees_on_github_df.key.alias("project"),
    explode(apache_committees_on_github_df.value.roster).alias("username"))


user_to_project_df = project_to_user_df.groupBy(project_to_user_df.username).agg(
    collect_set(project_to_user_df.project).alias("projects"))
apache_people_df = apache_people_df.join(user_to_project_df, on="username")
apache_people_df.alias("Apache People joined with projects")


Out[16]:
DataFrame[username: string, extra: struct<name:string,key_fingerprints:array<string>,urls:array<string>>, projects: array<string>]

In [ ]:


In [17]:
apache_people_df.take(1)


Out[17]:
[Row(username='macdonst', extra=Row(name='Simon MacDonald', key_fingerprints=None, urls=None), projects=['cordova'])]

Attempt to fetch relevant past & present meetups for each project - idea based on the listing at https://www.apache.org/events/meetups.html but different code

We want to do a non-blocking count to materialize the meetup RDD because this is slow


In [18]:
# Some async helpers, in Scala we would use AsyncRDDActions but its not currently available in Python
# Support is being considered in https://issues.apache.org/jira/browse/SPARK-20347
def non_blocking_rdd_count(rdd):
    import threading
    def count_magic():
        rdd.count()
    thread = threading.Thread(target=count_magic)
    thread.start()

def non_blocking_rdd_save(rdd, target):
    import threading
    def save_panda():
        rdd.saveAsPickleFile(target)
    thread = threading.Thread(target=save_panda)
    thread.start()

def non_blocking_df_save(df, target):
    import threading
    def save_panda():
        df.write.mode("overwrite").save(target)
    thread = threading.Thread(target=save_panda)
    thread.start()

def non_blocking_df_save_csv(df, target):
    import threading
    def save_panda():
        df.write.format("csv").mode("overwrite") \
            .option("header", "true") \
            .option("quoteAll", "false") \
            .save(target)
    thread = threading.Thread(target=save_panda)
    thread.start()

def non_blocking_df_save_or_load(df, target):
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jvm.java.net.URI(fs_prefix), sc._jsc.hadoopConfiguration())
    success_files = ["{0}/SUCCESS.txt", "{0}/_SUCCESS"]
    if any(fs.exists(sc._jvm.org.apache.hadoop.fs.Path(t.format(target))) for t in success_files):
        print("Reusing")
        return session.read.load(target).persist()
    else:
        print("Saving")
        non_blocking_df_save(df, target)
        return df

def non_blocking_df_save_or_load_csv(df, target):
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jvm.java.net.URI(fs_prefix), sc._jsc.hadoopConfiguration())
    success_files = ["{0}/SUCCESS.txt", "{0}/_SUCCESS"]
    if any(fs.exists(sc._jvm.org.apache.hadoop.fs.Path(t.format(target))) for t in success_files):
        print("Reusing")
        return session.read.format("csv").option("header", "true") \
                .option("inferSchema", "true").load(target).persist()
    else:
        print("Saving")
        non_blocking_df_save_csv(df, target)
        return df

In [19]:
logger = logging.getLogger()
logger.setLevel("WARN")
# For now, this is an avenue of future exploration, AKA holden's doesn't want her meetup API keys banned
def lookup_relevant_meetup(project_name, max_meetup_events=0):
    """Lookup relevant meetups for a specific project."""
    import logging
    import time
    import meetup.api
    logger = logging.getLogger()
    meetup_delay = 30
    meetup_reset_delay = 3600 # 1 hour
    standard_keys = {"text_format": "plain", "trending": "desc=true", "and_text": "true", "city": "san francisco", "country": "usa", "text": "apache " + project_name, "radius": 10000}
    results = {"upcoming": [], "past": []}
    for status in ["upcoming", "past"]:
        keys = copy(standard_keys)
        keys["status"] = status
        count = 200
        base = 0
        while (count == 200 and (max_meetup_events == 0 or base < max_meetup_events)):
            logging.debug("Fetch {0} meetups for {1} on base {2}".format(status, project_name, base))
            project_name = "spark"
            client = client = meetup.api.Client(meetup_key)
            if base > 0:
                keys["page"] = base
            # Manually sleep for meetup_reset_delay on failure, the meetup-api package retry logic sometimes breaks :(
            response = None
            retry_count = 0
            while response is None and retry_count < 10:
                try:
                    response = client.GetOpenEvents(**keys)
                except:
                    response = None
                    retry_count += 1
                    time.sleep(meetup_reset_delay)
                    try:
                        response = client.GetOpenEvents(**keys)
                    except:
                        response = None
            try:
                count = response.meta['count']
                base = base + count
                results[status].append(response.results)
                time.sleep(meetup_delay)
            except:
                count = 0
    return (project_name, results)

In [20]:
project_meetups_rdd = committee_names_df.repartition(500).rdd.map(lambda x: x.project).map(lambda name: lookup_relevant_meetup(name, max_meetup_events))
project_meetups_rdd.setName("Meetup Data RDD")


Out[20]:
Meetup Data RDD PythonRDD[72] at RDD at PythonRDD.scala:48

In [21]:
#project_meetups_rdd.persist(StorageLevel.MEMORY_AND_DISK)
#raw_project_meetups_df = project_meetups_rdd.toDF() 
#raw_project_meetups_df.alias("Project -> meetup dataframe")

In [22]:
#project_meetups_df = non_blocking_df_save_or_load(
#    raw_project_meetups_df, "mini_meetup_data")

In [23]:
#project_meetups_df.show()

In [24]:
#project_meetups_df.schema

For the provided projects attempt to lookup their GitHub


In [25]:
def lookup_project_git(org, project):
    """Returns the project github for a specific project. Assumes project is git hosted"""
    return "https://github.com/{0}/{1}.git".format(org, project)

In [26]:
def fetch_project_github_data(org, project):
    """Fetch the project github data, note this only gets github issues so likely not super useful"""
    from perceval.backends.core.github import GitHub as perceval_github
    gh_backend = perceval_github(owner=org, repository=project, api_token=gh_api_token)
    # The backend return a generator - which is awesome. However since we want to pull this data into Spark 
    def append_project_info(result):
        """Add the project information to the return from perceval"""
        result["project_name"] = project
        return result

    return list(map(append_project_info, gh_backend.fetch()))

In [27]:
def fetch_project_git_data(org, project):
    from perceval.backends.core.git import Git as perceval_git

    git_uri = lookup_project_git(org, project)
    import tempfile
    import shutil
    tempdir = tempfile.mkdtemp()

    def append_project_info(result):
        """Add the project information to the return from perceval"""
        result["project_name"] = project
        return result

    try:
        git_backend = perceval_git(uri=git_uri, gitpath=tempdir + "/repo")
        return list(map(append_project_info, git_backend.fetch()))
    finally:
        shutil.rmtree(tempdir)

Fetch the git history info using perceval


In [28]:
apache_git_project_data_rdd = committee_names_df.repartition(400).rdd.flatMap(lambda row: fetch_project_git_data("apache", row.project))
jupyter_git_project_data_rdd = sc.parallelize([("jupyter", "notebook"), ("nteract", "nteract")]).flatMap(lambda elem: fetch_project_git_data(elem[0], elem[1]))
git_project_data_rdd = apache_git_project_data_rdd.union(jupyter_git_project_data_rdd)
git_project_data_rdd.setName("Perceival GIT dat")


Out[28]:
Perceival GIT dat UnionRDD[84] at union at NativeMethodAccessorImpl.java:0

In [ ]:


In [ ]:


In [29]:
git_project_data_df_raw = git_project_data_rdd.map(lambda row: Row(**row)).toDF().persist()
git_project_data_df = non_blocking_df_save_or_load(git_project_data_df_raw, "{0}/raw_git_data".format(fs_prefix))


/opt/conda/lib/python3.6/importlib/_bootstrap.py:219: RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility. Expected 96, got 88
  return f(*args, **kwds)
Reusing

In [30]:
git_project_data_df.schema


Out[30]:
StructType(List(StructField(backend_name,StringType,true),StructField(backend_version,StringType,true),StructField(category,StringType,true),StructField(data,MapType(StringType,StringType,true),true),StructField(origin,StringType,true),StructField(perceval_version,StringType,true),StructField(project_name,StringType,true),StructField(tag,StringType,true),StructField(timestamp,DoubleType,true),StructField(updated_on,DoubleType,true),StructField(uuid,StringType,true)))

In [31]:
raw_authors_by_project_and_commit_df = git_project_data_df.select("project_name", "data.Author", "data.CommitDate")

In [32]:
raw_authors_by_project_and_commit_df.show()
raw_authors_by_project_and_commit_df.take(1)


+------------+--------------------+--------------------+
|project_name|              Author|          CommitDate|
+------------+--------------------+--------------------+
|      wicket|Eelco Hillenius <...|Tue Sep 21 18:49:...|
|      wicket|Eelco Hillenius <...|Tue Sep 21 20:13:...|
|      wicket|Martijn Dashorst ...|Wed Sep 22 00:02:...|
|      wicket|Martijn Dashorst ...|Wed Sep 22 06:17:...|
|      wicket|Martijn Dashorst ...|Wed Sep 22 06:44:...|
|      wicket|Eelco Hillenius <...|Wed Sep 22 07:33:...|
|      wicket|Eelco Hillenius <...|Wed Sep 22 09:25:...|
|      wicket|Eelco Hillenius <...|Wed Sep 22 11:03:...|
|      wicket|Eelco Hillenius <...|Wed Sep 22 13:53:...|
|      wicket|Martijn Dashorst ...|Wed Sep 22 19:35:...|
|      wicket|Martijn Dashorst ...|Wed Sep 22 20:38:...|
|      wicket|Martijn Dashorst ...|Wed Sep 22 20:38:...|
|      wicket|Johan Compagner <...|Wed Sep 22 21:20:...|
|      wicket|Johan Compagner <...|Wed Sep 22 21:21:...|
|      wicket|Eelco Hillenius <...|Mon Sep 27 12:36:...|
|      wicket|Eelco Hillenius <...|Sun Oct 3 15:34:2...|
|      wicket|Eelco Hillenius <...|Sun Oct 3 15:34:5...|
|      wicket|Eelco Hillenius <...|Sun Oct 3 15:35:3...|
|      wicket|Eelco Hillenius <...|Sun Oct 3 15:39:3...|
|      wicket|Eelco Hillenius <...|Sun Oct 3 19:18:1...|
+------------+--------------------+--------------------+
only showing top 20 rows

Out[32]:
[Row(project_name='wicket', Author='Eelco Hillenius <ehillenius@apache.org>', CommitDate='Tue Sep 21 18:49:10 2004 +0000')]

In [33]:
@F.pandas_udf(StringType())
def strip_junk(inputSeries):
    """Discard timezone information, who needs that anyways.
    More accurately we don't care about that here since we're looking at a year long window."""
    return inputSeries.apply(lambda x: x.split("+")[0])

@F.pandas_udf(StringType())
def extract_email(inputSeries):
    """Take e-mails of the form Foo Baz<foobaz@baz.com> and turn it into foobaz@baz.com"""
    import re
    def extract_email_record(record):
        try:
            emails = re.findall('<\S+>$', record)
            return emails[0]
        except:
            return record
    
    return inputSeries.apply(extract_email_record)

@F.pandas_udf(StringType())
def extract_name(inputSeries):
    """Take e-mails of the form Foo Baz<foobaz@baz.com> and turn it into the probable name e.g. Foo Baz"""
    import re
    def extract_name_record(record):
        try:
            emails = re.findall('([^<]+)<\S+>$', record)
            return emails[0]
        except:
            return ""
    
    return inputSeries.apply(extract_name_record)

In [34]:
authors_by_project_and_commit_df = raw_authors_by_project_and_commit_df.select(
    "project_name", "Author", extract_email("Author").alias("email"), extract_name("Author").alias("name"),
    F.to_date(strip_junk("CommitDate"), format="EEE MMM d H:mm:ss YYYY ").alias("CommitDate"))

In [35]:
authors_by_project_and_commit_df.schema


Out[35]:
StructType(List(StructField(project_name,StringType,true),StructField(Author,StringType,true),StructField(email,StringType,true),StructField(name,StringType,true),StructField(CommitDate,DateType,true)))

In [ ]:


In [36]:
raw_distinct_authors_latest_commit = authors_by_project_and_commit_df.groupBy(
    "project_name", "email").agg(
    F.last("Author").alias("Author"), F.max("CommitDate").alias("latest_commit"))
raw_distinct_authors_latest_commit.persist()


Out[36]:
DataFrame[project_name: string, email: string, Author: string, latest_commit: date]

In [37]:
distinct_authors_latest_commit = non_blocking_df_save_or_load(
    raw_distinct_authors_latest_commit,
    "{0}distinct_authors_latest_commit_4".format(fs_prefix))


Reusing

In [ ]:


In [38]:
@F.pandas_udf(StringType(), functionType=F.PandasUDFType.SCALAR)
def lookup_github_user_by_email(emails):
    
    import time
    from github import Github
    import backoff
    github_client = Github(gh_user, gh_api_token)
    # In theory PyGithub handles backoff but we have multiple instances/machines.
    @backoff.on_exception(backoff.expo, Exception)
    def inner_lookup_github_user_by_email(email):
        """Lookup github user by e-mail address and returns the github username. Returns None if no user or more than 1 user is found."""
        users = github_client.search_users("{0}".format(email))
        def process_result(users):
            if users.totalCount == 1:
                return list(users).pop().login
            else:
                return ""

        return process_result(users)

    return emails.apply(inner_lookup_github_user_by_email)

In [39]:
authors_to_github_username = distinct_authors_latest_commit.withColumn(
    "github_username",
    lookup_github_user_by_email("email"))

In [40]:
@F.pandas_udf(returnType=StringType(), functionType=F.PandasUDFType.SCALAR)
def fetch_github_user_bio(logins):
    from github import Github
    import time
    github_client = Github(gh_user, gh_api_token)
    import backoff

    @backoff.on_exception(backoff.expo, Exception)
    def individual_fetch_github_user_bio(login):
        if login == None or login == "":
            return ""

        result = github_client.get_user(login=login)
        try:
            return result.bio
        except:
            return ""
    
    return logins.apply(individual_fetch_github_user_bio)

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [41]:
authors_to_github_username.persist()
authors_to_github_username_saved = non_blocking_df_save_or_load(
    authors_to_github_username,
    "{0}/authors_to_github-10".format(fs_prefix))


Reusing

In [42]:
distinct_authors_latest_commit.schema


Out[42]:
StructType(List(StructField(project_name,StringType,true),StructField(email,StringType,true),StructField(Author,StringType,true),StructField(latest_commit,DateType,true)))

In [43]:
authors_to_github_username_saved.schema


Out[43]:
StructType(List(StructField(project_name,StringType,true),StructField(email,StringType,true),StructField(Author,StringType,true),StructField(latest_commit,DateType,true),StructField(github_username,StringType,true)))

In [44]:
distinct_authors_with_gh = authors_to_github_username_saved.withColumn(
    "new_unique_id",
    F.when(F.col("github_username") != "",
         F.col("github_username")).otherwise(
        F.col("email")))

In [45]:
authors_grouped_by_id = distinct_authors_with_gh.groupBy("project_name", "new_unique_id").agg(
    collect_set(F.col("email")).alias("emails"),
    F.last(F.col("Author")).alias("Author"),
    F.first("github_username").alias("github_username"),
    F.max("latest_commit").alias("latest_commit"))

In [46]:
authors_grouped_by_id.schema


Out[46]:
StructType(List(StructField(project_name,StringType,true),StructField(new_unique_id,StringType,true),StructField(emails,ArrayType(StringType,true),true),StructField(Author,StringType,true),StructField(github_username,StringType,true),StructField(latest_commit,DateType,true)))

In [ ]:


In [47]:
authors_grouped_by_id.persist()
authors_grouped_by_id_saved = non_blocking_df_save_or_load(
    authors_grouped_by_id,
    "{0}/authors_grouped_by_id-3".format(fs_prefix))


Reusing

In [ ]:

Lookup info from crunchbase


In [48]:
os.environ['PATH']


Out[48]:
'/opt/conda/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/chromium/'

In [ ]:


In [49]:
from lazy_helpers import *

bcast_driver = sc.broadcast(LazyDriver)

# TBD if we should see this, see comments on robots.txt in function, also consider overhead of firefox req
def lookup_crunchbase_info(people_and_projects):
    """Lookup a person a crunch base and see what the gender & company is.
    Filter for at least one mention of their projects."""
    # Path hack
    if not "chromium" in os.environ['PATH']:
        os.environ['PATH'] = os.environ['PATH'] + ":/usr/lib/chromium/"
    from bs4 import BeautifulSoup
    import re
    driver = bcast_driver.value.get()
    import time
    import random
    for (username, name, projects, urls) in people_and_projects:
        time.sleep(random.randint(60, 2*60))
        # robots.txt seems to be ok with person for now as of April 4 2018, double check before re-running this
        url = "https://www.crunchbase.com/person/{0}".format(name.replace(" ", "-"))
        try:
            if driver.current_url != url:
                driver.get(url)
            text = driver.page_source
            lower_text = text.lower()
            yield[lower_text]
            if "the quick brown fox jumps over the lazy dog" in lower_text or "pardon our interruption..." in lower_text:
                time.sleep(random.randint(30*60, 2*60*60))
                bcast_driver.value.reset()
            if any(project.lower() in lower_text for project in projects) or any(url.lower in lower_text for url in urls):
                soup = BeautifulSoup(text, "html.parser")
                stats = soup.findAll("div", { "class" : "component--fields-card"})[0]
                # Hacky but I'm lazy
                result = {}
                result["crunchbase-url"] = url
                result["username"] = username
                if "Female" in str(stats):
                    result["gender"] = "Female"
                if "Male" in str(stats):
                    result["gender"] = "Male"
                try:
                    m = re.search("\" title=\"(.+?)\" href=\"\/organization", lower_text)
                    result["company"] = m.group(1)
                except:
                    # No match no foul
                    pass
                yield result
        except Exception as e:
            pass

In [50]:
#result = lookup_crunchbase_info([("holden", "holden karau", ["spark"], ["http://www.holdenkarau.com"])])
#list(result)

Augment the committer info


In [51]:
# We do this as an RDD transformation since the cost of the transformation dominates
relevant_info = apache_people_df.select(
    apache_people_df.username,
    apache_people_df.extra.getField("name").alias("name"),
    apache_people_df.projects,
    apache_people_df.extra.getField("urls").alias("urls"))
crunchbase_info_rdd = relevant_info.rdd.map(lambda row: (row.username, row.name, row.projects, row.urls)).mapPartitions(lookup_crunchbase_info)
crunchbase_info_rdd.persist(StorageLevel.MEMORY_AND_DISK)
schema = StructType([
    StructField("username", StringType()),
    StructField("gender", StringType()),
    StructField("company", StringType()),
    StructField("crunchbase-url", StringType())])
crunchbase_info_df = crunchbase_info_rdd.toDF(schema = schema)
crunchbase_info_df.alias("Crunchbase user information")


Out[51]:
DataFrame[username: string, gender: string, company: string, crunchbase-url: string]

In [52]:
crunchbase_info_df = non_blocking_df_save_or_load(
    crunchbase_info_df,
    "{0}crunchbase_out_11".format(fs_prefix))


Saving

In [53]:
#crunchbase_info_df.count()

In [54]:
apache_people_df.count()


Out[54]:
2565

In [55]:
apache_people_df.schema


Out[55]:
StructType(List(StructField(username,StringType,false),StructField(extra,StructType(List(StructField(name,StringType,true),StructField(key_fingerprints,ArrayType(StringType,true),true),StructField(urls,ArrayType(StringType,true),true))),true),StructField(projects,ArrayType(StringType,true),true)))

In [ ]:


In [ ]:


In [ ]:

Export to Mechnical turk format


In [56]:
def mini_concat_udf(array_strs):
    """Concat the array of strs"""
    if array_strs == None:
        return ""
    else:
        return ' '.join(array_strs)

# Except I'm a bad person so....
from pyspark.sql.catalog import UserDefinedFunction
mini_concat_udf = UserDefinedFunction(mini_concat_udf, StringType(), "mini_concat_udf")
session.catalog._jsparkSession.udf().registerPython("mini_concat_udf", mini_concat_udf._judf)

mini_csv_data_df = apache_people_df.select(
    apache_people_df.username,
    apache_people_df.extra.getField("name").alias("name"),
    mini_concat_udf(apache_people_df.extra.getField("urls")).alias("personal_websites"),
    mini_concat_udf(apache_people_df.projects).alias("projects")
    ).coalesce(1)

In [57]:
mini_csv_data_df = non_blocking_df_save_or_load_csv(
    mini_csv_data_df,
    "{0}/apache_people.csv".format(fs_prefix))


Reusing

In [58]:
#crunchbase_info_rdd.collect()

One of the things that is interesting is understanding what the tones of the meetup descriptions & mailing list posts are. We can use https://www.ibm.com/watson/developercloud/tone-analyzer/api/v3/?python#introduction


In [59]:
# TODO: pandas UDF accelerate (but multiple pieces of informaiton returned at the same time)
def lookup_sentiment(document):
    """Looks up the sentiment for a specific document."""
    from nltk.sentiment.vader import SentimentIntensityAnalyzer

    # Hack to download if needed
    # TODO(holden): Consider broadcast variable?
    try:
        sid = SentimentIntensityAnalyzer()
    except LookupError:
        import nltk
        nltk.download('vader_lexicon')
        sid = SentimentIntensityAnalyzer()

    sid = SentimentIntensityAnalyzer()
    return sid.polarity_scores(document)

In [60]:
lookup_sentiment("Thanks! I still think it needs a bit more work, but.")


Out[60]:
{'neg': 0.0, 'neu': 0.781, 'pos': 0.219, 'compound': 0.3054}

In [61]:
lookup_sentiment("Who fucking broke the build?")


Out[61]:
{'neg': 0.436, 'neu': 0.564, 'pos': 0.0, 'compound': -0.4754}

Ok its time to find some mailing list info


In [62]:
sentiment_schema = StructType([
    StructField("neg", DoubleType()),
    StructField("neu", DoubleType()),
    StructField("pos", DoubleType()),
    StructField("compound", DoubleType())])

lookup_sentiment_udf = UserDefinedFunction(
    lookup_sentiment,
    sentiment_schema,
    "lookup_sentiment_2")

In [63]:
mbox_failures = sc.accumulator(0)

def fetch_mbox_ids(project_name):
    """Return the mbox ids"""
    import itertools

    def fetch_mbox_ids_apache_site(box_type):
        """Fetches all of the mbox ids from a given apache project and box type (dev or user)"""
        root_url = "http://mail-archives.apache.org/mod_mbox/{0}-{1}".format(project_name, box_type)
        
        # Fetch the page to parse
        pool = bcast_pool.value.get()
        result = pool.request('GET', root_url)
        
        
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(result.data, "html.parser")
        mbox_ids = set(map(lambda tag: tag.get('id'), soup.findAll("span", { "class" : "links"})))
        return map(lambda box_id: (project_name, box_type, box_id), mbox_ids)
    # We have to return a list here because PySpark doesn't handle generators (TODO: holden)
    return list(itertools.chain.from_iterable(map(fetch_mbox_ids_apache_site, ["dev", "user"])))
        
        
def fetch_and_process_mbox_records(project_name, box_type, mbox_id):
        import tempfile
        import shutil
        from perceval.backends.core.mbox import MBox as perceval_mbox

        def process_mbox_directory(base_url, dir_path):
            mbox_backend = perceval_mbox(base_url, dir_path)
            return mbox_backend.fetch()
        
        def append_project_info(result):
            """Add the project information to the return from perceval"""
            result["project_name"] = project_name
            result["box_type"] = box_type
            result["mbox_id"] = mbox_id
            return result

        # Make a temp directory to hold the mbox files
        tempdir = tempfile.mkdtemp()

        try:
            root_url = "http://mail-archives.apache.org/mod_mbox/{0}-{1}".format(project_name, box_type)
            mbox_url = "{0}/{1}.mbox".format(root_url, mbox_id)
            filename = "{0}/{1}.mbox".format(tempdir, mbox_id)
        
            print("fetching {0}".format(mbox_url))

            pool = bcast_pool.value.get()
            with pool.request('GET', mbox_url, preload_content=False) as r, open(filename, 'wb') as out_file:       
                try:
                    shutil.copyfileobj(r, out_file)
                    return list(map(append_project_info, process_mbox_directory(root_url, tempdir)))
                except:
                    mbox_failures.add(1)
                    return []
        finally:
            shutil.rmtree(tempdir)

In [ ]:


In [ ]:


In [64]:
def random_key(x):
    import random
    return (random.randint(0, 40000), x)

def de_key(x):
    return x[1]

mailing_list_posts_mbox_ids = committee_names_df.repartition(400).rdd.flatMap(lambda row: fetch_mbox_ids(row.project))
# mbox's can be big, so break up how many partitions we have
mailing_list_posts_mbox_ids = mailing_list_posts_mbox_ids.map(random_key).repartition(2000).map(de_key)
mailing_list_posts_rdd = mailing_list_posts_mbox_ids.flatMap(lambda args: fetch_and_process_mbox_records(*args))
mailing_list_posts_rdd.persist(StorageLevel.MEMORY_AND_DISK)


Out[64]:
PythonRDD[216] at RDD at PythonRDD.scala:48

In [65]:
schema = StructType([
    StructField("project_name",StringType()),
    StructField("box_type",StringType()), # dev or user
    StructField("mbox_id",StringType()),
    StructField("backend_name",StringType()),
    StructField("backend_version",StringType()),
    StructField("category",StringType()),
    StructField("data", MapType(StringType(),StringType())), # The "important" bits
    StructField("origin",StringType()),
    StructField("perceval_version",StringType()),
    StructField("tag",StringType()),
    StructField("timestamp",DoubleType()),
    StructField("updated_on",DoubleType()),
    StructField("uuid",StringType())])
mailing_list_posts_mbox_df_raw = mailing_list_posts_rdd.toDF(schema=schema)
mailing_list_posts_mbox_df_raw.persist(StorageLevel.MEMORY_AND_DISK)
mailing_list_posts_mbox_df_raw.alias("Mailing list perceival information - no post processing")


Out[65]:
DataFrame[project_name: string, box_type: string, mbox_id: string, backend_name: string, backend_version: string, category: string, data: map<string,string>, origin: string, perceval_version: string, tag: string, timestamp: double, updated_on: double, uuid: string]

In [66]:
mailing_list_posts_mbox_df_raw = non_blocking_df_save_or_load(
    mailing_list_posts_mbox_df_raw,
    "{0}mailing_list_info_6".format(fs_prefix))


Reusing

In [ ]:


In [67]:
records = mailing_list_posts_mbox_df_raw.take(5)


Exception in thread Thread-6:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-18-2ddafa47624e>", line 20, in save_panda
    df.write.mode("overwrite").save(target)
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 703, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o790.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 57 in stage 24.0 failed 4 times, most recent failure: Lost task 57.3 in stage 24.0 (TID 1182, holden-magic-sw-q7l8.c.boos-demo-projects-are-rad.internal, executor 39): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 671, in prepare
    verify_func(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1421, in verify
    verify_value(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1400, in verify_struct
    "length of fields (%d)" % (len(obj), len(verifiers))))
ValueError: Length of object (1) does not match with length of fields (4)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
	... 30 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 671, in prepare
    verify_func(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1421, in verify
    verify_value(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1400, in verify_struct
    "length of fields (%d)" % (len(obj), len(verifiers))))
ValueError: Length of object (1) does not match with length of fields (4)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more



In [68]:
records[0]


Out[68]:
Row(project_name='juddi', box_type='dev', mbox_id='200506', backend_name='MBox', backend_version='0.10.2', category='message', data={'Reply-To': '<steve@viens.net>', 'X-MSMail-Priority': 'Normal', 'List-Post': '<mailto:juddi-dev@ws.apache.org>', 'X-MimeOLE': 'Produced By Microsoft MimeOLE V6.00.2900.2527', 'Message-ID': '<003001c5676d$8fdf9d20$6401a8c0@INSPIRON>', 'body': '{plain=jUDDI (in cvs) has been updated with Axis 1.2 final and I plan to stick\nto the original plan that we voted on which is releasing a 0.9rc4 in a\nday or two and a 0.9 final within two weeks ... I\'ll use the two weeks\nto update jUDDI documentation.\n \nSteve\n, html=<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">\n<HTML><HEAD>\n<META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=us-ascii">\n<TITLE>Message</TITLE>\n\n<META content="MSHTML 6.00.2900.2627" name=GENERATOR></HEAD>\n<BODY>\n<DIV><FONT face=Arial size=2><SPAN class=894221712-02062005>jUDDI (in cvs) has \nbeen updated with Axis 1.2 final and I plan to stick to the original plan that \nwe voted on which is releasing a 0.9rc4 in a day or two and a 0.9 final within \ntwo weeks ... I\'ll use the two weeks to update jUDDI \ndocumentation.</SPAN></FONT></DIV>\n<DIV><FONT face=Arial size=2><SPAN \nclass=894221712-02062005></SPAN></FONT>&nbsp;</DIV>\n<DIV><FONT face=Arial size=2><SPAN \nclass=894221712-02062005>Steve</SPAN></FONT></DIV></BODY></HTML>\n}', 'Received-SPF': 'neutral (hermes.apache.org: local policy)', 'Importance': 'Normal', 'list-help': '<mailto:juddi-dev-help@ws.apache.org>', 'X-Mailer': 'Microsoft Outlook, Build 10.0.2627', 'To': '<juddi-dev@ws.apache.org>', 'X-Virus-Checked': 'Checked', 'Mailing-List': 'contact juddi-dev-help@ws.apache.org; run by ezmlm', 'Content-Type': 'multipart/alternative;\n\tboundary="----=_NextPart_000_0031_01C5674C.08CDFD20"', 'X-Spam-Rating': 'minotaur.apache.org 1.6.2 0/1000/N', 'Return-Path': '<juddi-dev-return-942-apmail-ws-juddi-dev-archive=ws.apache.org@ws.apache.org>', 'list-unsubscribe': '<mailto:juddi-dev-unsubscribe@ws.apache.org>', 'Received': 'from inspiron (c-24-128-68-179.hsd1.nh.comcast.net[24.128.68.179])\n          by comcast.net (sccrmhc12) with SMTP\n          id <20050602122123012000qtt7e>; Thu, 2 Jun 2005 12:21:23 +0000', 'From': '"Steve Viens" <steve@viens.net>', 'Date': 'Thu, 2 Jun 2005 08:21:05 -0400', 'MIME-Version': '1.0', 'Subject': 'jUDDI updated with Axis 1.2 final', 'X-Spam-Check-By': 'apache.org', 'X-Priority': '3 (Normal)', 'Delivered-To': 'mailing list juddi-dev@ws.apache.org', 'List-Id': '<juddi-dev.ws.apache.org>', 'Precedence': 'bulk', 'unixfrom': 'juddi-dev-return-942-apmail-ws-juddi-dev-archive=ws.apache.org@ws.apache.org Thu Jun 02 12:21:44 2005', 'X-ASF-Spam-Status': 'No, hits=0.0 required=10.0\n\ttests=HTML_60_70,HTML_MESSAGE'}, origin='http://mail-archives.apache.org/mod_mbox/juddi-dev', perceval_version='0.11.2', tag='http://mail-archives.apache.org/mod_mbox/juddi-dev', timestamp=1533685140.170142, updated_on=1117714865.0, uuid='9fb9a57324ec482149ec09fd6922b5f32f4d90b7')

In [69]:
mailing_list_posts_mbox_df = mailing_list_posts_mbox_df_raw.select(
    "*",
    mailing_list_posts_mbox_df_raw.data.getField("From").alias("from"),
    extract_email(mailing_list_posts_mbox_df_raw.data.getField("From")).alias("from_processed_email"),
    mailing_list_posts_mbox_df_raw.data.getField("body").alias("body"),
    mailing_list_posts_mbox_df_raw.data.getField("Message-ID").alias("message_id"),
    mailing_list_posts_mbox_df_raw.data.getField("In-Reply-To").alias("in_reply_to"),
    mailing_list_posts_mbox_df_raw.data.getField("Content-Language").alias("content_language")
    )

In [70]:
mailing_list_posts_mbox_df_saved = non_blocking_df_save_or_load(
    mailing_list_posts_mbox_df,
    "{0}/processed_mbox_data_4".format(fs_prefix))


Reusing

In [71]:
post_sentiment_df = mailing_list_posts_mbox_df_saved.select("project_name", lookup_sentiment_udf("body").alias("sentiment"))

In [191]:
post_sentiment_df_saved = non_blocking_df_save_or_load(
    post_sentiment_df,
    "{0}/post_sentiment_df_1".format(fs_prefix))


Reusing

In [241]:
agg_post_sentiment = post_sentiment_df_saved.groupBy("project_name").agg(
    F.max("sentiment.neg").alias("sentiment.neg_max"),
    F.avg("sentiment.neg").alias("sentiment.neg_avg"), F.expr('percentile_approx(sentiment.neg, array(0.25, 0.5, 0.75))').alias("neg_quantiles"),
    F.max("sentiment.pos").alias("sentiment.pos_max"), F.avg("sentiment.pos").alias("sentiment.pos_avg"), F.expr('percentile_approx(sentiment.neg, array(0.25, 0.5, 0.75))').alias("pos_quantiles"),
).select(
    "*",
    F.col("project_name").alias("project"),
    F.expr("neg_quantiles[0]").alias("sentiment.neg_25quantile"),
    F.expr("neg_quantiles[1]").alias("sentiment.neg_50quantile"),
    F.expr("neg_quantiles[2]").alias("sentiment.neg_75quantile"),
    F.expr("pos_quantiles[0]").alias("sentiment.pos_25quantile"),
    F.expr("pos_quantiles[1]").alias("sentiment.pos_50quantile"),
    F.expr("pos_quantiles[2]").alias("sentiment.pos_75quantile")).cache()

In [242]:
agg_post_sentiment.show()


+------------+-----------------+--------------------+--------------------+-----------------+-------------------+--------------------+------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|project_name|sentiment.neg_max|   sentiment.neg_avg|       neg_quantiles|sentiment.pos_max|  sentiment.pos_avg|       pos_quantiles|     project|sentiment.neg_25quantile|sentiment.neg_50quantile|sentiment.neg_75quantile|sentiment.pos_25quantile|sentiment.pos_50quantile|sentiment.pos_75quantile|
+------------+-----------------+--------------------+--------------------+-----------------+-------------------+--------------------+------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|      roller|            0.192| 0.03653448275862067|[0.009, 0.026, 0....|            0.297|0.08503940886699507|[0.009, 0.026, 0....|      roller|                   0.009|                   0.026|                   0.053|                   0.009|                   0.026|                   0.053|
|       twill|            0.148| 0.02367897727272727|  [0.0, 0.02, 0.035]|            0.337|0.06119744318181825|  [0.0, 0.02, 0.035]|       twill|                     0.0|                    0.02|                   0.035|                     0.0|                    0.02|                   0.035|
|    marmotta|            0.249|0.030172535211267552| [0.0, 0.023, 0.045]|            0.294|0.07125704225352113| [0.0, 0.023, 0.045]|    marmotta|                     0.0|                   0.023|                   0.045|                     0.0|                   0.023|                   0.045|
|       shiro|            0.221|0.026898450946643684| [0.0, 0.021, 0.037]|            0.437|0.08386574870912221| [0.0, 0.021, 0.037]|       shiro|                     0.0|                   0.021|                   0.037|                     0.0|                   0.021|                   0.037|
|       eagle|            0.141|0.021073170731707325| [0.0, 0.014, 0.031]|            0.197|0.07302439024390245| [0.0, 0.014, 0.031]|       eagle|                     0.0|                   0.014|                   0.031|                     0.0|                   0.014|                   0.031|
|       hbase|            0.336|0.047520055452864765| [0.0, 0.029, 0.058]|            0.419| 0.0648682994454709| [0.0, 0.029, 0.058]|       hbase|                     0.0|                   0.029|                   0.058|                     0.0|                   0.029|                   0.058|
|  cloudstack|            0.374|0.030660410167236668|[0.011, 0.027, 0.04]|            0.416|0.07643164731754547|[0.011, 0.027, 0.04]|  cloudstack|                   0.011|                   0.027|                    0.04|                   0.011|                   0.027|                    0.04|
|       juddi|            0.302| 0.04106702898550722| [0.0, 0.023, 0.054]|            0.192|0.05783514492753621| [0.0, 0.023, 0.054]|       juddi|                     0.0|                   0.023|                   0.054|                     0.0|                   0.023|                   0.054|
|       spark|            0.297|0.029928716020821203|[0.006, 0.023, 0....|            0.503|0.08106636784268348|[0.006, 0.023, 0....|       spark|                   0.006|                   0.023|                   0.297|                   0.006|                   0.023|                   0.297|
|     phoenix|            0.278|  0.0286186250280833| [0.0, 0.019, 0.278]|            0.604|0.06938373399236117| [0.0, 0.019, 0.278]|     phoenix|                     0.0|                   0.019|                   0.278|                     0.0|                   0.019|                   0.278|
|        bval|            0.129| 0.03530769230769231|[0.005, 0.02, 0.066]|            0.308|0.09284615384615381|[0.005, 0.02, 0.066]|        bval|                   0.005|                    0.02|                   0.066|                   0.005|                    0.02|                   0.066|
|       maven|            0.258|0.027266794319451174|  [0.0, 0.02, 0.042]|            0.405|0.05773400351045161|  [0.0, 0.02, 0.042]|       maven|                     0.0|                    0.02|                   0.042|                     0.0|                    0.02|                   0.042|
|openmeetings|            0.355| 0.03206666666666668|[0.011, 0.026, 0....|            0.351|0.06567819548872181|[0.011, 0.026, 0....|openmeetings|                   0.011|                   0.026|                   0.043|                   0.011|                   0.026|                   0.043|
|         poi|             0.44|0.026662299239222347|  [0.0, 0.02, 0.039]|            0.473|0.09284953508030437|  [0.0, 0.02, 0.039]|         poi|                     0.0|                    0.02|                   0.039|                     0.0|                    0.02|                   0.039|
|        hive|            0.299|0.031159021113243708| [0.0, 0.022, 0.048]|            0.744| 0.0713851247600767| [0.0, 0.022, 0.048]|        hive|                     0.0|                   0.022|                   0.048|                     0.0|                   0.022|                   0.048|
+------------+-----------------+--------------------+--------------------+-----------------+-------------------+--------------------+------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+


In [243]:
apache_people_df.schema


Out[243]:
StructType(List(StructField(username,StringType,false),StructField(extra,StructType(List(StructField(name,StringType,true),StructField(key_fingerprints,ArrayType(StringType,true),true),StructField(urls,ArrayType(StringType,true),true))),true),StructField(projects,ArrayType(StringType,true),true)))

In [74]:
mailing_list_posts_mbox_df_saved.schema


Out[74]:
StructType(List(StructField(project_name,StringType,true),StructField(box_type,StringType,true),StructField(mbox_id,StringType,true),StructField(backend_name,StringType,true),StructField(backend_version,StringType,true),StructField(category,StringType,true),StructField(data,MapType(StringType,StringType,true),true),StructField(origin,StringType,true),StructField(perceval_version,StringType,true),StructField(tag,StringType,true),StructField(timestamp,DoubleType,true),StructField(updated_on,DoubleType,true),StructField(uuid,StringType,true),StructField(from,StringType,true),StructField(from_processed_email,StringType,true),StructField(body,StringType,true),StructField(message_id,StringType,true),StructField(in_reply_to,StringType,true),StructField(content_language,StringType,true)))
Find committers welcome e-mail

In [75]:
#apache_people_df.join(mailing_list_posts_mbox_df_saved,
#                     join_conditions(F.instr(mailing_list_posts_mbox_df_saved)))

Start using some of the lazily created DFs to compute the sample


In [76]:
authors_grouped_by_id_saved.count()


Out[76]:
15440

In [77]:
authors_grouped_by_id_saved.show()


+------------+--------------------+--------------------+--------------------+---------------+-------------+
|project_name|       new_unique_id|              emails|              Author|github_username|latest_commit|
+------------+--------------------+--------------------+--------------------+---------------+-------------+
|    accumulo|  <dhutchis@mit.edu>|[<dhutchis@mit.edu>]|Dylan Hutchison <...|               |   2015-01-02|
|    accumulo|<mario.pastorelli...|[<mario.pastorell...|Mario Pastorelli ...|               |   2015-12-30|
|    activemq|<czobrisky@gmail....|[<czobrisky@gmail...|Chad Zobrisky <cz...|               |   2014-12-31|
|       arrow|<adeneche@dremio....|[<adeneche@dremio...|adeneche <adenech...|               |   2017-01-06|
|       atlas|<amestry@apache.org>|[<amestry@apache....|Ashutosh Mestry <...|               |   2017-01-06|
|       atlas|<venkat@hortonwor...|[<venkat@hortonwo...|Venkat Ranganatha...|               |   2015-01-02|
|      aurora|<ndonatucci@medal...|[<ndonatucci@meda...|Nicolás Donatucci...|               |   2017-01-03|
|      aurora|<sgeorge@twitter....|[<sgeorge@twitter...|Selvin George <sg...|               |   2013-01-05|
|        beam|<cpovirk@google.com>|[<cpovirk@google....|cpovirk <cpovirk@...|               |   2016-01-01|
|        beam|<p.kaczmarczyk@oc...|[<p.kaczmarczyk@o...|Pawel Kaczmarczyk...|               |   2018-01-02|
|      bigtop|<pengwenwu2008@16...|[<pengwenwu2008@1...|Wenwu Peng <pengw...|               |   2014-01-02|
|    brooklyn|  <bostko@gmail.com>|[<bostko@gmail.com>]|Valentin Aitken <...|               |   2014-12-30|
|     calcite|<serhii.harnyk@gm...|[<serhii.harnyk@g...|Serhii-Harnyk <se...|               |   2015-12-29|
|       camel|<Thomas.papke@icw...|[<Thomas.papke@ic...|Thopap <Thomas.pa...|               |   2017-01-03|
|       camel|<jvazquez@tecsisa...|[<jvazquez@tecsis...|juanjovazquez <jv...|               |   2014-01-01|
|       camel|             nkukhar|[<kukhar.n@gmail....|nkukhar <kukhar.n...|        nkukhar|   2015-01-02|
|       camel|        softwaredoug|[<dturnbull@o19s....|Doug Turnbull <dt...|   softwaredoug|   2014-01-04|
|  carbondata|<SRIGOPALMOHANTY@...|[<SRIGOPALMOHANTY...|SRIGOPALMOHANTY <...|               |   2017-01-03|
|  carbondata|<vincent.chenfei@...|[<vincent.chenfei...|vincentchenfei <v...|               |   2017-01-03|
|   cassandra|<MWeiser@ardmoref...|[<MWeiser@ardmore...|Matthias Weiser <...|               |   2018-01-02|
+------------+--------------------+--------------------+--------------------+---------------+-------------+
only showing top 20 rows


In [78]:
num_authors_by_project = authors_grouped_by_id_saved.groupBy("project_name").agg(F.count("Author").alias("author_count"))
num_authors_by_project.cache()
num_authors_by_project.show()


+-------------+------------+
| project_name|author_count|
+-------------+------------+
|         lucy|          14|
|      vxquery|          16|
|    chemistry|          12|
|       roller|          19|
|        geode|         136|
|       falcon|          55|
|trafficserver|         307|
|          tez|          26|
|       pdfbox|          21|
|        httpd|         113|
|   carbondata|         135|
|        celix|          13|
|     accumulo|          95|
|       wicket|          98|
|   servicemix|          19|
|        twill|          38|
|     clerezza|          17|
|      couchdb|         169|
|       bigtop|         139|
|     marmotta|          25|
+-------------+------------+
only showing top 20 rows

Compute the sample %s for each project so we can get reasonable confidence bounds for sampling. Looking at http://veekaybee.github.io/2015/08/04/how-big-of-a-sample-size-do-you-need/


In [79]:
# I think is Cochran's formula scaled for small datasets
@F.udf(IntegerType())
def compute_num_required_sample_1(pop_size):
    import numpy as np
    import scipy.stats
    import math
    e = 0.05
    Z = 1.64 # 90%, 95%: 1.96
    p = 0.5
    N = pop_size
    # CALC SAMPLE SIZE
    n_0 = ((Z**2) * p * (1-p)) / (e**2)
    # ADJUST SAMPLE SIZE FOR FINITE POPULATION
    n = n_0 / (1 + ((n_0 - 1) / float(N)) )
    target = int(math.ceil(n))
    # Compute a fall back size
    fall_back_size = min(3, pop_size)
    return max(fall_back_size, target) # THE SAMPLE SIZE

In [80]:
# Number 2: https://en.wikipedia.org/wiki/Sample_size_determination#Estimation
def walds_method():
    return 1/(0.05**2) # +- 5%
walds_method()


Out[80]:
399.99999999999994

In [81]:
raw_sample_sizes = num_authors_by_project.withColumn(
    "sample_size_1",
    compute_num_required_sample_1("author_count")).persist()

In [82]:
sample_sizes = non_blocking_df_save_or_load(
    raw_sample_sizes,
    "{0}/sample_sizes_10".format(fs_prefix))


Reusing

In [83]:
sample_sizes.show()


+------------+------------+-------------+
|project_name|author_count|sample_size_1|
+------------+------------+-------------+
|spamassassin|          39|           35|
|        oodt|          41|           36|
|     cayenne|          45|           39|
|     streams|          25|           23|
|  jackrabbit|          37|           33|
|  manifoldcf|          19|           18|
|        drat|          29|           27|
|      giraph|          37|           33|
|     cordova|           1|            1|
|      thrift|         237|          127|
|      tomcat|          34|           31|
|     syncope|          31|           28|
|     calcite|         159|          101|
|      impala|         131|           89|
|       nutch|          72|           57|
|    activemq|          90|           68|
|  deltaspike|          56|           47|
|      allura|          84|           65|
|openmeetings|           7|            7|
|       tomee|          47|           41|
+------------+------------+-------------+
only showing top 20 rows


In [84]:
sample_sizes.groupby().agg(F.sum("sample_size_1")).show()


+------------------+
|sum(sample_size_1)|
+------------------+
|              9117|
+------------------+

So this is a bit high to do on a shoestring budget with sampling, but what about if we limit to folks who have recently participated & got rid of projects with limited or no recent participation.


In [85]:
authors_grouped_by_id_saved.schema


Out[85]:
StructType(List(StructField(project_name,StringType,true),StructField(new_unique_id,StringType,true),StructField(emails,ArrayType(StringType,true),true),StructField(Author,StringType,true),StructField(github_username,StringType,true),StructField(latest_commit,DateType,true)))

In [86]:
active_distinct_authors_latest_commit = authors_grouped_by_id_saved.filter(
    (F.date_sub(F.current_date(), 365)) < authors_grouped_by_id_saved.latest_commit)

In [87]:
active_distinct_authors_latest_commit.schema


Out[87]:
StructType(List(StructField(project_name,StringType,true),StructField(new_unique_id,StringType,true),StructField(emails,ArrayType(StringType,true),true),StructField(Author,StringType,true),StructField(github_username,StringType,true),StructField(latest_commit,DateType,true)))

In [88]:
active_distinct_authors_latest_commit.show()
print(active_distinct_authors_latest_commit.count())
active_distinct_authors_latest_commit.take(5)


+------------+--------------------+--------------------+--------------------+---------------+-------------+
|project_name|       new_unique_id|              emails|              Author|github_username|latest_commit|
+------------+--------------------+--------------------+--------------------+---------------+-------------+
|        beam|<p.kaczmarczyk@oc...|[<p.kaczmarczyk@o...|Pawel Kaczmarczyk...|               |   2018-01-02|
|   cassandra|<MWeiser@ardmoref...|[<MWeiser@ardmore...|Matthias Weiser <...|               |   2018-01-02|
|         cxf|<butkovic@gmail.com>|[<butkovic@gmail....|Peter Butkovic <b...|               |   2017-12-31|
|         cxf|<simon.marti@inve...|[<simon.marti@inv...|Simon Marti <simo...|               |   2017-12-31|
|       eagle| <yonzhang@ebay.com>|[<yonzhang@ebay.c...|yonzhang <yonzhan...|               |   2018-01-02|
|        hive| <ganu.ec@gmail.com>|[<ganu.ec@gmail.c...|Ganesha Shreedhar...|               |   2018-01-05|
|       karaf| <ancosen@gmail.com>|[<ancosen@gmail.c...|Andrea Cosentino ...|               |   2018-01-04|
|        kudu|<wdberkeley@apach...|[<wdberkeley@apac...|Will Berkeley <wd...|               |   2018-01-06|
|       nutch|<bvachon@attivio....|[<bvachon@attivio...|Ben Vachon <bvach...|               |   2018-01-05|
|    systemml|             mboehm7|[<mboehm7@gmail.c...|Matthias Boehm <m...|        mboehm7|   2018-01-06|
|       tomee|             AndyGee|[<agumbrecht@tomi...|Andy Gumbrecht <a...|        AndyGee|   2018-01-01|
|    zeppelin|<simonmueller@liv...|[<simonmueller@li...|skymon <simonmuel...|               |   2018-01-05|
|       karaf|              MrEasy|[<r.neubauer@seeb...|Rico Neubauer <r....|         MrEasy|   2017-12-31|
|       arrow|<brian.hulette@cc...|[<brian.hulette@c...|Brian Hulette <br...|               |   2018-01-05|
|       camel|           davsclaus|[<claus.ibsen@gma...|Claus Ibsen <clau...|      davsclaus|   2018-01-06|
|  cloudstack|<the.evergreen@gm...|[<the.evergreen@g...|Frank Maximus <th...|               |   2018-01-05|
|        fluo|<furkankamaci@gma...|[<furkankamaci@gm...|Furkan KAMACI <fu...|               |   2018-01-02|
|      hadoop| <epayne@apache.org>|[<epayne@apache.o...|Eric Payne <epayn...|               |   2018-01-05|
|        kudu|<anjuwong@g.ucla....|[<anjuwong@g.ucla...|Andrew Wong <anju...|               |   2018-01-04|
|        kudu|<sailesh@apache.org>|[<sailesh@apache....|Sailesh Mukil <sa...|               |   2018-01-04|
+------------+--------------------+--------------------+--------------------+---------------+-------------+
only showing top 20 rows

2240
Out[88]:
[Row(project_name='beam', new_unique_id='<p.kaczmarczyk@ocado.com>', emails=['<p.kaczmarczyk@ocado.com>'], Author='Pawel Kaczmarczyk <p.kaczmarczyk@ocado.com>', github_username='', latest_commit=datetime.date(2018, 1, 2)),
 Row(project_name='cassandra', new_unique_id='<MWeiser@ardmorefinancial.com>', emails=['<MWeiser@ardmorefinancial.com>'], Author='Matthias Weiser <MWeiser@ardmorefinancial.com>', github_username='', latest_commit=datetime.date(2018, 1, 2)),
 Row(project_name='cxf', new_unique_id='<butkovic@gmail.com>', emails=['<butkovic@gmail.com>'], Author='Peter Butkovic <butkovic@gmail.com>', github_username='', latest_commit=datetime.date(2017, 12, 31)),
 Row(project_name='cxf', new_unique_id='<simon.marti@inventage.com>', emails=['<simon.marti@inventage.com>'], Author='Simon Marti <simon.marti@inventage.com>', github_username='', latest_commit=datetime.date(2017, 12, 31)),
 Row(project_name='eagle', new_unique_id='<yonzhang@ebay.com>', emails=['<yonzhang@ebay.com>'], Author='yonzhang <yonzhang@ebay.com>', github_username='', latest_commit=datetime.date(2018, 1, 2))]

In [89]:
active_num_authors_by_project = active_distinct_authors_latest_commit.groupBy("project_name").agg(F.count("Author").alias("author_count"))
active_num_authors_by_project.cache()
active_num_authors_by_project.show()


+-------------+------------+
| project_name|author_count|
+-------------+------------+
|         lucy|           2|
|       roller|           1|
|        geode|          47|
|       falcon|           4|
|trafficserver|          35|
|          tez|           6|
|       pdfbox|           4|
|        httpd|          14|
|   carbondata|          33|
|        celix|           6|
|       wicket|          14|
|     accumulo|          12|
|        twill|           2|
|      couchdb|          22|
|       bigtop|          12|
|     marmotta|           2|
|          vcl|           1|
|   freemarker|           4|
|       buildr|           2|
|        shiro|           2|
+-------------+------------+
only showing top 20 rows


In [90]:
active_raw_sample_sizes = active_num_authors_by_project.withColumn(
    "sample_size_1",
    compute_num_required_sample_1("author_count")).persist()

In [91]:
active_sample_sizes = non_blocking_df_save_or_load_csv(
    active_raw_sample_sizes,
    "{0}/active_sample_sizes_13".format(fs_prefix))


Reusing

In [92]:
filtered_active_sample_sizes = active_sample_sizes.filter(
    active_sample_sizes.sample_size_1 > 10).persist()

In [93]:
filtered_active_sample_sizes.groupby().agg(F.sum("sample_size_1")).show()


+------------------+
|sum(sample_size_1)|
+------------------+
|              1605|
+------------------+

That's probably ok, lets go ahead and compute the sample set for each project


In [94]:
sample_fractions = filtered_active_sample_sizes.withColumn(
    "sample_fraction",
    (filtered_active_sample_sizes.sample_size_1+0.5) / filtered_active_sample_sizes.author_count)

In [95]:
local_sample_fractions = sample_fractions.select(
    sample_fractions.project_name, sample_fractions.sample_fraction).collect()

In [96]:
local_sample_fractions


Out[96]:
[Row(project_name='impala', sample_fraction=0.8928571428571429),
 Row(project_name='calcite', sample_fraction=0.9782608695652174),
 Row(project_name='nutch', sample_fraction=1.03125),
 Row(project_name='thrift', sample_fraction=0.9482758620689655),
 Row(project_name='drill', sample_fraction=0.94),
 Row(project_name='trafficserver', sample_fraction=0.9285714285714286),
 Row(project_name='accumulo', sample_fraction=1.0416666666666667),
 Row(project_name='wicket', sample_fraction=1.0357142857142858),
 Row(project_name='subversion', sample_fraction=1.0357142857142858),
 Row(project_name='groovy', sample_fraction=1.0416666666666667),
 Row(project_name='phoenix', sample_fraction=0.9782608695652174),
 Row(project_name='storm', sample_fraction=0.95),
 Row(project_name='airavata', sample_fraction=0.9782608695652174),
 Row(project_name='asterixdb', sample_fraction=1.0416666666666667),
 Row(project_name='zeppelin', sample_fraction=0.9761904761904762),
 Row(project_name='ignite', sample_fraction=0.7611111111111111),
 Row(project_name='camel', sample_fraction=0.823076923076923),
 Row(project_name='arrow', sample_fraction=0.87),
 Row(project_name='beam', sample_fraction=0.75),
 Row(project_name='metron', sample_fraction=1.0357142857142858),
 Row(project_name='geode', sample_fraction=0.8829787234042553),
 Row(project_name='spark', sample_fraction=0.7008196721311475),
 Row(project_name='ranger', sample_fraction=0.975),
 Row(project_name='tika', sample_fraction=1.0454545454545454),
 Row(project_name='kylin', sample_fraction=0.9054054054054054),
 Row(project_name='ambari', sample_fraction=0.7911392405063291),
 Row(project_name='samza', sample_fraction=0.9772727272727273),
 Row(project_name='carbondata', sample_fraction=0.9242424242424242),
 Row(project_name='bookkeeper', sample_fraction=0.94),
 Row(project_name='cloudstack', sample_fraction=0.9027777777777778),
 Row(project_name='zookeeper', sample_fraction=1.0384615384615385),
 Row(project_name='tinkerpop', sample_fraction=1.03125),
 Row(project_name='trafodion', sample_fraction=0.9444444444444444),
 Row(project_name='cassandra', sample_fraction=0.875),
 Row(project_name='libcloud', sample_fraction=0.9772727272727273),
 Row(project_name='notebook', sample_fraction=0.9242424242424242),
 Row(project_name='couchdb', sample_fraction=0.9772727272727273),
 Row(project_name='nteract', sample_fraction=0.9722222222222222),
 Row(project_name='bigtop', sample_fraction=1.0416666666666667),
 Row(project_name='struts', sample_fraction=1.0454545454545454),
 Row(project_name='hadoop', sample_fraction=0.764367816091954),
 Row(project_name='httpd', sample_fraction=1.0357142857142858),
 Row(project_name='hbase', sample_fraction=0.8188405797101449),
 Row(project_name='kafka', sample_fraction=0.7526315789473684),
 Row(project_name='karaf', sample_fraction=1.0294117647058822),
 Row(project_name='atlas', sample_fraction=1.0357142857142858),
 Row(project_name='flink', sample_fraction=0.823076923076923),
 Row(project_name='mesos', sample_fraction=0.8557692307692307),
 Row(project_name='kudu', sample_fraction=0.9791666666666666),
 Row(project_name='nifi', sample_fraction=0.8777777777777778),
 Row(project_name='hive', sample_fraction=0.8584905660377359),
 Row(project_name='cxf', sample_fraction=0.9482758620689655),
 Row(project_name='orc', sample_fraction=1.0357142857142858)]

In [97]:
sampled_authors = active_distinct_authors_latest_commit.sampleBy(
    "project_name",
    fractions=dict(map(lambda r: (r[0], min(1.0, r[1])), local_sample_fractions)),
    seed=42)

In [ ]:


In [98]:
sampled_authors_saved = non_blocking_df_save_or_load(
    sampled_authors, "{0}/sampled_authors_6".format(fs_prefix)).alias("sampled_authors")


Reusing

In [99]:
sampled_authors_saved.show()


+------------+--------------------+--------------------+--------------------+---------------+-------------+
|project_name|       new_unique_id|              emails|              Author|github_username|latest_commit|
+------------+--------------------+--------------------+--------------------+---------------+-------------+
|      ambari|<adoroszlai@apach...|[<adoroszlai@apac...|Doroszlai, Attila...|               |   2018-01-05|
|       arrow|           JinHai-CN|[<haijin.chn@gmai...|Jin Hai <haijin.c...|      JinHai-CN|   2018-01-03|
|     calcite|<maryann.xue@gmai...|[<maryann.xue@gma...|maryannxue <marya...|               |   2018-01-04|
|       camel|             gautric|[<gautric@redhat....|gautric <gautric@...|        gautric|   2017-12-31|
|  carbondata|<praveenmeenakshi...|[<praveenmeenaksh...|praveenmeenakshi5...|               |   2018-01-04|
|  cloudstack|<372575+khos2ow@u...|[<372575+khos2ow@...|Khosrow Moossavi ...|               |   2018-01-02|
|     couchdb|<alexander@spotme...|[<alexander@spotm...|AlexanderKarabero...|               |   2018-01-05|
|      groovy|<jwagenleitner@ap...|[<jwagenleitner@a...|John Wagenleitner...|               |   2018-01-01|
|      ignite|<ilantukh@gridgai...|[<ilantukh@gridga...|Ilya Lantukh <ila...|               |   2018-01-05|
|      ignite|<pivanov@gridgain...|[<pivanov@gridgai...|Ivanov Petr <piva...|               |   2018-01-03|
|      impala|<dknupp@cloudera....|[<dknupp@cloudera...|David Knupp <dknu...|               |   2018-01-05|
|       karaf|    <fpa@openrun.re>|  [<fpa@openrun.re>]|Francois Papon <f...|               |   2018-01-05|
|       kylin|    <ganma@ebay.com>|  [<ganma@ebay.com>]|Ma,Gang <ganma@eb...|               |   2018-01-03|
|        nifi| <bbende@apache.org>|[<bbende@apache.o...|Bryan Bende <bben...|               |   2018-01-05|
|       samza|<tstumpges@ntent....|[<tstumpges@ntent...|thunderstumpges <...|               |   2018-01-01|
|      struts|<zalsaeed@cs.uore...|[<zalsaeed@cs.uor...|zalsaeed <zalsaee...|               |   2018-01-01|
|        tika|<nassif.lfcn@dpf....|[<nassif.lfcn@dpf...|Nassif <nassif.lf...|               |   2018-01-04|
|      ambari|<vsuvagia@hortonw...|[<vsuvagia@horton...|Vishal Suvagia <v...|               |   2018-01-05|
|        beam|<alan@Alans-MacBo...|[<alan@Alans-MacB...|Alan Myrvold <ala...|               |   2018-01-03|
|        beam|<petr.shevtsov@gm...|[<petr.shevtsov@g...|Petr Shevtsov <pe...|               |   2018-01-05|
+------------+--------------------+--------------------+--------------------+---------------+-------------+
only showing top 20 rows


In [100]:
sampled_authors_saved.count()


Out[100]:
1591

In [ ]:


In [101]:
sampled_authors_saved.schema


Out[101]:
StructType(List(StructField(project_name,StringType,true),StructField(new_unique_id,StringType,true),StructField(emails,ArrayType(StringType,true),true),StructField(Author,StringType,true),StructField(github_username,StringType,true),StructField(latest_commit,DateType,true)))

In [102]:
sampled_authors_grouped_by_author_id = sampled_authors_saved.groupBy(
    sampled_authors_saved.new_unique_id).agg(
    F.collect_set(sampled_authors_saved.project_name).alias("projects"),
    F.first(sampled_authors_saved.emails).alias("emails"),
    F.first(sampled_authors_saved.Author).alias("Author"),
    F.first(sampled_authors_saved.github_username).alias("github_username"))
sampled_authors_grouped_by_author_id_flattened = sampled_authors_grouped_by_author_id.select(
    "new_unique_id",
    F.concat_ws(' ', sampled_authors_grouped_by_author_id.projects).alias("projects"),
    "emails",
    "Author",
    "github_username")

In [103]:
sampled_authors_grouped_by_author_id_flattened.show()


+--------------------+------------+--------------------+--------------------+---------------+
|       new_unique_id|    projects|              emails|              Author|github_username|
+--------------------+------------+--------------------+--------------------+---------------+
|<yuzhong.yz@aliba...|     calcite|[<yuzhong.yz@alib...|yuzhong <yuzhong....|               |
|<vandana.yadav759...|  carbondata|[<vandana.yadav75...|vandana <vandana....|               |
|<vozerov@gridgain...|      ignite|[<vozerov@gridgai...|devozerov <vozero...|               |
|<jcustenborder@gm...|       kafka|[<jcustenborder@g...|Jeremy Custenbord...|               |
| <jdeppe@pivotal.io>|       geode|[<jdeppe@pivotal....|Jens Deppe <jdepp...|               |
|<30875507+ashwini...|   tinkerpop|[<30875507+ashwin...|Ashwini Singh <30...|               |
|              yew1eb|       flink|[<yew1eb@gmail.com>]|yew1eb <yew1eb@gm...|         yew1eb|
| <b.lerer@gmail.com>|   cassandra|[<b.lerer@gmail.c...|Benjamin Lerer <b...|               |
|<ehsan.totoni@int...|       arrow|[<ehsan.totoni@in...|Ehsan Totoni <ehs...|               |
|          anoopsjohn|       hbase|[<anoopsamjohn@gm...|anoopsamjohn <ano...|     anoopsjohn|
|<zhaijia@apache.org>|  bookkeeper|[<zhaijia@apache....|Jia Zhai <zhaijia...|               |
|<progers@maprtech...|       drill|[<progers@maprtec...|Paul Rogers <prog...|               |
|<lizhou.gao@zilli...|       arrow|[<lizhou.gao@zill...|Lizhou Gao <lizho...|               |
|<rrussell@adobe.com>|       arrow|[<rrussell@adobe....|rrussell <rrussel...|               |
|<cchampeau@apache...|      groovy|[<cchampeau@apach...|Cedric Champeau <...|               |
|           ArnaudFnr|        beam|[<arnaudfournier9...|Arnaud <arnaudfou...|      ArnaudFnr|
|               PnPie|       kafka|[<yu.liu003@gmail...|Yu <yu.liu003@gma...|          PnPie|
|<cao.xuewen@zte.c...|       spark|[<cao.xuewen@zte....|caoxuewen <cao.xu...|               |
|             bgaborg|hbase hadoop|[<gabor.bota@clou...|Gabor Bota <gabor...|        bgaborg|
|    <jhc@apache.org>|      hadoop|  [<jhc@apache.org>]|James Clampffer <...|               |
+--------------------+------------+--------------------+--------------------+---------------+
only showing top 20 rows


In [104]:
sampled_authors_grouped_by_author_id_flattened.cache()
sampled_authors_grouped_by_author_id_flattened.count()


Out[104]:
1535

Join the sampled authors with the e-mails on the dev list and find the top 3 most recent responses


In [ ]:


In [105]:
mailing_list_posts_mbox_df_saved.printSchema()


root
 |-- project_name: string (nullable = true)
 |-- box_type: string (nullable = true)
 |-- mbox_id: string (nullable = true)
 |-- backend_name: string (nullable = true)
 |-- backend_version: string (nullable = true)
 |-- category: string (nullable = true)
 |-- data: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- origin: string (nullable = true)
 |-- perceval_version: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: double (nullable = true)
 |-- updated_on: double (nullable = true)
 |-- uuid: string (nullable = true)
 |-- from: string (nullable = true)
 |-- from_processed_email: string (nullable = true)
 |-- body: string (nullable = true)
 |-- message_id: string (nullable = true)
 |-- in_reply_to: string (nullable = true)
 |-- content_language: string (nullable = true)


In [106]:
sampled_authors_grouped_by_author_id_flattened.schema


Out[106]:
StructType(List(StructField(new_unique_id,StringType,true),StructField(projects,StringType,false),StructField(emails,ArrayType(StringType,true),true),StructField(Author,StringType,true),StructField(github_username,StringType,true)))

In [107]:
def extract_posts_by_authors(authors):
    join_conditions = [
        #sampled_authors_saved.project_name == mailing_list_posts_mbox_df_saved.project_name,
        F.expr("array_contains(emails, from_processed_email)")]
    return authors.join(mailing_list_posts_mbox_df_saved, join_conditions).select(
        "message_id", "new_unique_id").alias("posts_by_sampled_authors")

posts_by_sampled_authors = extract_posts_by_authors(
    sampled_authors_grouped_by_author_id_flattened).alias(
    "posts_by_sampled_authors").cache()

In [108]:
posts_by_sampled_authors_saved = non_blocking_df_save_or_load(
    posts_by_sampled_authors, "{0}/posts_by_sampled_authors_5".format(fs_prefix)).alias("posts_by_sampled_authors")


Reusing

In [109]:
posts_by_sampled_authors_saved.schema


Out[109]:
StructType(List(StructField(message_id,StringType,true),StructField(new_unique_id,StringType,true)))

In [110]:
mailing_list_posts_in_reply_to = mailing_list_posts_mbox_df_saved.filter(
    mailing_list_posts_mbox_df_saved.in_reply_to.isNotNull()).alias("mailing_list_posts_in_reply_to")

In [ ]:


In [111]:
def first_5k_chars(tokens):
    return tokens[0:5000]

first_5k_chars_udf = UserDefinedFunction(
    first_5k_chars,
    StringType(),
    "first_5k_chars")

In [112]:
def contains_pronoun(tokens):
    common_pronouns = ["they", "ze", "he", "she", "her", "his", "their"]
    return any(pronoun in tokens for pronoun in common_pronouns)

contains_pronoun_udf = UserDefinedFunction(
    contains_pronoun,
    BooleanType(),
    "contains_pronoun")

In [113]:
def relevant_posts_with_replies(mailing_list_posts_in_reply_to, posts):
    
    posts_with_replies = posts.join(
        mailing_list_posts_in_reply_to,
        [F.col("mailing_list_posts_in_reply_to.in_reply_to") == posts.message_id],
        "inner")
    posts_in_response_to_user = posts_with_replies.select(
        posts_with_replies.new_unique_id,
        posts_with_replies.timestamp,
        posts_with_replies.project_name,
        posts_with_replies.body.alias("orig_body"),
        first_5k_chars_udf(posts_with_replies.body).alias("body"))
    posts_in_response_to_user.cache()
    
    from sparklingml.feature.python_pipelines import SpacyTokenizeTransformer
    spacy_tokenizer = SpacyTokenizeTransformer(inputCol="body", outputCol="body_tokens")
    
    posts_in_response_to_user_tokenized = spacy_tokenizer.transform(
        posts_in_response_to_user)
    posts_in_response_to_user_tokenized.cache()
    # Need to break the chain... its not great.
    
    posts_in_response_to_user_with_pronouns = posts_in_response_to_user_tokenized.filter(
        contains_pronoun_udf(posts_in_response_to_user_tokenized.body_tokens))
    # nyet
    posts_in_response_to_user_with_pronouns.cache()
    #return posts_in_response_to_user_with_pronouns

    posts_in_response_to_user_grouped = posts_in_response_to_user_with_pronouns.orderBy(
        posts_in_response_to_user.timestamp).groupBy(
        posts_in_response_to_user.new_unique_id)
    posts_in_response_to_user_collected = posts_in_response_to_user_grouped.agg(
        F.collect_list(posts_in_response_to_user_with_pronouns.body).alias("emails"))
    # nyet
    return posts_in_response_to_user_collected

In [114]:
mailing_list_posts_in_reply_to.cache().count()
posts_by_sampled_authors.cache().count()
posts_in_response_to_user_collected = relevant_posts_with_replies(
    mailing_list_posts_in_reply_to, posts_by_sampled_authors)
#posts_in_response_to_user_collected.first()
posts_in_response_to_user_collected_saved = non_blocking_df_save_or_load(
    posts_in_response_to_user_collected,
    "{0}/posts_by_user_9".format(fs_prefix))


Using backing jar /sparklingml/sparklingml/../target/scala-2.11/sparklingml-assembly-0.0.1-SNAPSHOT.jar
/opt/conda/lib/python3.6/importlib/_bootstrap.py:219: RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility. Expected 96, got 88
  return f(*args, **kwds)
/opt/conda/lib/python3.6/importlib/_bootstrap.py:219: RuntimeWarning: numpy.ufunc size changed, may indicate binary incompatibility. Expected 192, got 176
  return f(*args, **kwds)
Reusing

In [115]:
posts_in_response_to_user_collected_saved.printSchema()


root
 |-- new_unique_id: string (nullable = true)
 |-- emails: array (nullable = true)
 |    |-- element: string (containsNull = true)


In [116]:
posts_in_response_to_user_collected.show()


+--------------------+--------------------+
|       new_unique_id|              emails|
+--------------------+--------------------+
|<vozerov@gridgain...|[{plain=Hi,

I go...|
|<jcustenborder@gm...|[{plain=On 06/07/...|
...|
|              yew1eb|[{plain=Congrats ...|
|<cchampeau@apache...|[{plain=On 09.04....|
|           ArnaudFnr|[{plain=This is a...|
|<progers@maprtech...|[{plain=Just thou...|
|<zhaijia@apache.org>|[{plain=+1 (non b...|
| <nreich@pivotal.io>|[{plain=The packa...|
|         qianzhangxa|[{plain=I comment...|
|<gaston@mesospher...|[{plain=The ASF m...|
|<ssainath@hortonw...|[{plain=Congratul...|
|<peng.jianhua@zte...|[{plain=
--------...|
|<awong@cloudera.com>|[{plain=>
> How d...|
|<sboikov@apache.org>|[{plain=I don't t...|
|<mgergely@hortonw...|[{plain=Thanks Mi...|
|<ekl@databricks.com>|[{plain=Hi Eric,
...|
|           hachikuji|[{plain=I agree w...|
|<vovatkach75@gmai...|[{plain=I’d be op...|
|               sijie|[{plain=If I reme...|
+--------------------+--------------------+
only showing top 20 rows


In [117]:
#posts_in_response_to_user_collected.count()

In [118]:
sampled_authors_saved.filter(sampled_authors_saved.new_unique_id == "").show()


+------------+-------------+------+------+---------------+-------------+
|project_name|new_unique_id|emails|Author|github_username|latest_commit|
+------------+-------------+------+------+---------------+-------------+
+------------+-------------+------+------+---------------+-------------+


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:

Write a sample for analysis


In [119]:
def first10(posts):
    return posts[0:10]
first10_udf = UserDefinedFunction(
    first10,
    ArrayType(StringType()),
    "first10")

In [120]:
top_posts_for_user = posts_in_response_to_user_collected_saved.select(
    posts_in_response_to_user_collected_saved.new_unique_id,
    first10_udf(posts_in_response_to_user_collected_saved.emails).alias("top10emails")).alias("top_posts_for_user")

In [121]:
joined_sample = sampled_authors_saved.join(
    top_posts_for_user,
    top_posts_for_user.new_unique_id == sampled_authors_saved.new_unique_id,
    "LEFT_OUTER").select(
    "project_name",
    F.col("sampled_authors.new_unique_id").alias("id"),
    "Author",
    "github_username",
    "top10emails")

In [122]:
joined_sample_saved = non_blocking_df_save_or_load(
    joined_sample,
    "{0}/joined_sample_3".format(fs_prefix)).alias("joined_sample")


Reusing

In [123]:
joined_sample_saved.show()


+------------+--------------------+--------------------+---------------+--------------------+
|project_name|                  id|              Author|github_username|         top10emails|
+------------+--------------------+--------------------+---------------+--------------------+
|      bigtop|  <530590615@qq.com>|zhtisi <530590615...|               |                null|
|       arrow|<antoine@python.org>|Antoine Pitrou <a...|               |[{plain=My feelin...|
|       nutch|<bvachon@attivio....|Ben Vachon <bvach...|               |[{plain=Hi Ben

O...|
|    notebook|<danalee96@gmail....|danagilliann <dan...|               |                null|
|        hive| <kb.pcre@gmail.com>|Bertalan Kondrat ...|               |                null|
|      ambari|<smolnar@hortonwo...|Sandor Molnar <sm...|               |[{plain=+1

On ...|
|       hbase|<swilson@siftscie...|Scott Wilson <swi...|               |                null|
|       hbase|<tianjy1990@gmail...|tianjingyun <tian...|               |                null|
|       drill|<vitalii.diravka@...|Vitalii Diravka <...|               |[{plain=sounds go...|
|     calcite|<vitalii.diravka@...|Vitalii Diravka <...|               |[{plain=sounds go...|
|       camel|            cunningt|Tom Cunningham <t...|       cunningt|[{plain=Tom, not ...|
|         cxf|               jimma|Jim Ma <ema@apach...|          jimma|                null|
|        nifi|       ottobackwards|Otto Fowler <otto...|  ottobackwards|[{plain=Note:  Gr...|
|       storm|             ptgoetz|P. Taylor Goetz <...|        ptgoetz|[{plain=Thanks! T...|
|       flink|            tzulitai|Tzu-Li (Gordon) T...|       tzulitai|[{plain=Hi Gordon...|
|        hive|  <daijyc@gmail.com>|Daniel Dai <daijy...|               |[{plain=

> On No...|
|     calcite| <ebegoli@gmail.com>|Edmon Begoli <ebe...|               |[{plain=Edmon,

I...|
|       geode|<jbarrett@pivotal...|Jacob Barrett <jb...|               |[{plain=After fur...|
|       httpd| <rpluem@apache.org>|Ruediger Pluem <r...|               |[{plain=On Sun, 0...|
|       nutch|<yossi.tamari@pip...|Yossi Tamari <yos...|               |[{plain=You shoul...|
+------------+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows


In [124]:
@F.pandas_udf(StringType())
def create_email_snippet(emails):
    import os
    def create_snippet(email):
        if email is None:
            return email
        #result = email.replace(os.linesep, ' ')
        # IDK but seems required for the other system :(
        #result = result.replace('\n', ' ')
        #result = result.replace('\r', ' ')
        #result = result.replace(',', ' ')
        #result = result.replace(',', ' ')
        result = email
        import string
        printable = set(string.printable)
        result = ''.join(filter(lambda x: x in printable, result))
        if len(result) > 500:
            result = result[0:500] + "..... e-mail condensed for readability"
        return result
    return emails.apply(create_snippet)

In [125]:
sc.getConf().get("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version")

In [126]:
formatted_sample = joined_sample_saved.select(
    "project_name",
    "id",
    "Author",
    "github_username",
    create_email_snippet(joined_sample_saved.top10emails[0]).alias("email0"),
    create_email_snippet(joined_sample_saved.top10emails[1]).alias("email1"),
    create_email_snippet(joined_sample_saved.top10emails[2]).alias("email2")).cache().repartition(10)

In [127]:
formatted_sample.show()


+-------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+
| project_name|                  id|              Author|github_username|              email0|              email1|              email2|
+-------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+
|   cloudstack|<daan.hoogland@sh...|Daan Hoogland <da...|               |{plain=So here is...|{plain=You unders...|{plain=Will great...|
|          cxf|<freeman.fang@gma...|Freeman Fang <fre...|               |{plain=Hi Freeman...|{plain=+1

On Thu...|{plain=+1

On Thu...|
|    cassandra|    <mck@apache.org>|Mick Semb Wever <...|               |{plain=On 04/12/2...|{plain=On 12-09-0...|{plain=On Wed, 20...|
|        hbase|<thiruvel@gmail.com>|Thiruvel Thirumoo...|               |                null|                null|                null|
|       groovy|<nikchugunov@gith...|Nikolay Chugunov ...|               |                null|                null|                null|
|        camel|       <rovo@gmx.at>|Roman Vottner <ro...|               |                null|                null|                null|
|        camel|<sebastien.beaume...|BEAUME <sebastien...|               |                null|                null|                null|
|        samza| <navina@apache.org>|navina <navina@ap...|               |{plain=Hey Navina...|{plain=Thanks muc...|{plain=Hey Navina...|
|       impala|   <lv@cloudera.com>|Lars Volker <lv@c...|               |{plain=+1

Obviou...|{plain=@Lars Volk...|{plain=That looks...|
|        spark|<marcogaido91@gma...|Marco Gaido <marc...|               |{plain=Thank you ...|{plain=Congrats a...|{plain=Java 9/10 ...|
|        drill|<rajrahul@gmail.com>|Rahul Raj <rajrah...|               |{plain=Have you l...|{plain=I was sugg...|                null|
|   bookkeeper|<cguttapalem@sale...|cguttapalem <cgut...|               |                null|                null|                null|
|        hbase|<weichiu@cloudera...|Wei-Chiu Chuang <...|               |{plain=Thanks for...|{plain=>From TD p...|{plain=Not yet.  ...|
|        kafka|                dguy|Damian Guy <damia...|           dguy|{plain=Thanks Dam...|{plain=Damian,

T...|{plain=>> The key...|
|trafficserver|            shinrich|Susan Hinrichs <s...|       shinrich|{plain=All right,...|{plain=Originally...|                null|
|        kafka|<asutosh.pandya@h...|asutosh936 <asuto...|               |                null|                null|                null|
|        geode| <jiliao@pivotal.io>|jinmeiliao <jilia...|               |{plain=

> On Jun...|{plain=fix for th...|{plain=
---------...|
|   bookkeeper|           eolivelli|Enrico Olivelli <...|      eolivelli|{plain=Enrico,

C...|{plain=>>but ther...|{plain=>>maybe yo...|
|        karaf|<luke@code-house....|Łukasz Dywicki <l...|               |{plain=On Tuesday...|{plain=Yes. I thi...|{plain=2011/1/20 ...|
|       thrift|<kerri.devine@sie...|Kerri Devine <ker...|               |                null|                null|                null|
+-------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+
only showing top 20 rows


In [128]:
formatted_sample_pq_saved = non_blocking_df_save_or_load(
    formatted_sample,
    "{0}/formatted_sample_pq_11".format(fs_prefix))


Reusing

In [129]:
def html_escape(raw_string):
    import html
    if raw_string is None:
        return raw_string
    initial_escape = html.escape(raw_string)
    return initial_escape.replace(os.linesep, '<br>').replace('\n', '').replace('\r', '')
html_escape_udf = UserDefinedFunction(
      html_escape,
      StringType(),
      "html_escape_udf")

In [130]:
escaped = formatted_sample_pq_saved.select(
    list(map(lambda col: html_escape_udf(col).alias(col), formatted_sample_pq_saved.columns)))
formatted_sample_csv_saved = non_blocking_df_save_or_load_csv(
    escaped,
    "{0}/formatted_sample_csv_14".format(fs_prefix))


Reusing

In [131]:
projects = formatted_sample_pq_saved.groupBy(
    formatted_sample_pq_saved.project_name).agg(F.first("project_name")).select("project_name")

In [132]:
projects


Out[132]:
DataFrame[project_name: string]

In [133]:
projects_csv = non_blocking_df_save_or_load_csv(projects, "{0}/projects".format(fs_prefix))


Reusing

In [134]:
projects_csv.count()


Out[134]:
53

Load back the human processed data & process the columns into formats that are easier with Spark


In [135]:
def rewrite_human_data(df):
    columns = df.columns
    candidates_to_select = filter(lambda column: "Input" in column or "Answer" in column, columns)

    def easy_name(column_name):
        return column_name.replace(".", "_")
    
    rewrite_literals = map(
        lambda column_name: F.col("`{0}`".format(column_name)).alias(easy_name(column_name)),
        candidates_to_select)
    return df.select(*list(rewrite_literals))

In [136]:
project_human_raw_df = session.read.format("csv").option("header", "true") \
                .option("inferSchema", "true").load(
    "{0}/human_data/projects".format(fs_prefix))

In [137]:
project_human_df = non_blocking_df_save_or_load(
    rewrite_human_data(project_human_raw_df),
    "{0}/human_data_cleaned/projects".format(fs_prefix))


Reusing

In [138]:
sampled_contirbutors_human_raw_df = session.read.format("csv").option("header", "true") \
                .option("inferSchema", "true").load(
    "{0}/human_data/sampled_contirbutors".format(fs_prefix))

In [139]:
sampled_contirbutors_human_df = non_blocking_df_save_or_load(
    rewrite_human_data(sampled_contirbutors_human_raw_df),
    "{0}/human_data_cleaned/sampled_contributors".format(fs_prefix))


Reusing

In [140]:
asf_people_human_raw_df = session.read.format("csv").option("header", "true") \
                .option("inferSchema", "true").load(
    "{0}/human_data/asf_people".format(fs_prefix))

In [141]:
asf_people_human_df = non_blocking_df_save_or_load(
    rewrite_human_data(asf_people_human_raw_df),
    "{0}/human_data_cleaned/asf_people".format(fs_prefix))


Reusing

In [142]:
project_human_df.show()


+-------------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+
|Input_project|Answer_code_of_conduct|Answer_code_of_conduct_difficulty|Answer_committer_guide|Answer_committer_guide_difficulty|Answer_contributing_guide|Answer_contributing_guide_difficulty|     Answer_feedback|Answer_mentoring_guide|Answer_mentoring_guide_difficulty|
+-------------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+
|     accumulo|                  none|                             hard|  https://accumulo....|                             easy|     https://accumulo....|                                easy|                  NA|                  none|                             hard|
|     airavata|                  none|                             hard|                  none|                             hard|     https://airavata....|                                easy|                  NA|                  none|                             hard|
|       ambari|                  none|                             hard|                  none|                             hard|                     none|                                hard|                None|                  none|                             hard|
|        arrow|                  none|                             hard|  https://arrow.apa...|                             easy|     https://arrow.apa...|                                easy|                  NA|                  none|                             hard|
|    asterixdb|                  none|                             hard|  https://asterixdb...|                             easy|     https://asterixdb...|                                easy|It will be great ...|                  none|                             hard|
|        atlas|                  none|                             hard|                  none|                             hard|                     none|                                hard|                  NA|                  none|                             hard|
|         beam|                  none|                             hard|  https://beam.apac...|                             easy|     https://beam.apac...|                                easy|                  NA|                  none|                             hard|
|       bigtop|                  none|                             hard|  https://cwiki.apa...|                             easy|     https://cwiki.apa...|                                easy|                  NA|                  none|                             hard|
|   bookkeeper|  https://bookkeepe...|                             Easy|                  none|                             hard|     https://bookkeepe...|                                easy|                none|                  none|                             hard|
|   bookkeeper|                  none|                             hard|  https://bookkeepe...|                             easy|     https://bookkeepe...|                                easy|                  NA|                  none|                             hard|
|      calcite|                  none|                             hard|  https://community...|                             easy|     https://calcite.a...|                                easy|                  NA|                  none|                             hard|
|        camel|                  none|                             hard|  http://camel.apac...|                             easy|     http://camel.apac...|                                easy|                  NA|                  none|                             hard|
|   carbondata|                  none|                             hard|  https://carbondat...|                             easy|     https://github.co...|                                easy|           no issues|                  none|                             hard|
|    cassandra|  https://whimsy.ap...|                             easy|  https://wiki.apac...|                             easy|     https://wiki.apac...|                                easy|           no issues|                  none|                             hard|
|   cloudstack|                  none|                             hard|                  none|                             hard|                     none|                                hard|Perhaps you could...|                  none|                             hard|
|      couchdb|  http://couchdb.ap...|                             easy|                  none|                             hard|     http://couchdb.ap...|                                easy|                  NA|                  none|                             hard|
|          cxf|                  none|                             hard|                  none|                             hard|     http://cxf.apache...|                                easy|                  NA|                  none|                             hard|
|        drill|                  none|                             hard|                  none|                             hard|     https://drill.apa...|                                easy|                  NA|                  none|                             hard|
|        flink|                  none|                             hard|  https://flink.apa...|                             easy|     https://flink.apa...|                                easy|                None|                  none|                             hard|
|        geode|  https://cwiki-tes...|                             easy|  http://incubator....|                             easy|     https://cwiki-tes...|                                easy|           no issues|  https://cwiki.apa...|                             easy|
+-------------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+
only showing top 20 rows


In [143]:
def clean_maybe_link(col):
    if col is None:
        return None
    cleaned_ish = col.lower()
    if cleaned_ish == "none" or cleaned_ish == "na":
        return None
    if "http://" in cleaned_ish or "https://" in cleaned_ish:
        return cleaned_ish
    else:
        return None

clean_maybe_link_udf = UserDefinedFunction(
    clean_maybe_link, StringType(), "clean_maybe_link_field")

def clean_difficulty(col):
    if col is None:
        return None
    cleaned_ish = col.lower()
    if cleaned_ish == "none" or cleaned_ish == "na":
        return None
    return ''.join(cleaned_ish.split(' '))

clean_difficulty_udf = UserDefinedFunction(
    clean_difficulty, StringType(), "clean_difficulty_field")

In [144]:
def process_column(f):
    if f == "Input_project":
        return F.col("Input_project").alias("project")
    elif "Input" in f:
        return f
    elif "difficulty" in f:
        return clean_difficulty_udf(f).alias(f)
    elif "Answer_feedback" in f:
        return f
    else:
        return clean_maybe_link_udf(f).alias(f)
project_human_cleaned_df = project_human_df.select(
    *list(map(process_column, project_human_df.columns)))

In [145]:
project_human_cleaned_df.show()


+----------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+
|   project|Answer_code_of_conduct|Answer_code_of_conduct_difficulty|Answer_committer_guide|Answer_committer_guide_difficulty|Answer_contributing_guide|Answer_contributing_guide_difficulty|     Answer_feedback|Answer_mentoring_guide|Answer_mentoring_guide_difficulty|
+----------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+
|  accumulo|                  null|                             hard|  https://accumulo....|                             easy|     https://accumulo....|                                easy|                  NA|                  null|                             hard|
|  airavata|                  null|                             hard|                  null|                             hard|     https://airavata....|                                easy|                  NA|                  null|                             hard|
|    ambari|                  null|                             hard|                  null|                             hard|                     null|                                hard|                None|                  null|                             hard|
|     arrow|                  null|                             hard|  https://arrow.apa...|                             easy|     https://arrow.apa...|                                easy|                  NA|                  null|                             hard|
| asterixdb|                  null|                             hard|  https://asterixdb...|                             easy|     https://asterixdb...|                                easy|It will be great ...|                  null|                             hard|
|     atlas|                  null|                             hard|                  null|                             hard|                     null|                                hard|                  NA|                  null|                             hard|
|      beam|                  null|                             hard|  https://beam.apac...|                             easy|     https://beam.apac...|                                easy|                  NA|                  null|                             hard|
|    bigtop|                  null|                             hard|  https://cwiki.apa...|                             easy|     https://cwiki.apa...|                                easy|                  NA|                  null|                             hard|
|bookkeeper|  https://bookkeepe...|                             easy|                  null|                             hard|     https://bookkeepe...|                                easy|                none|                  null|                             hard|
|bookkeeper|                  null|                             hard|  https://bookkeepe...|                             easy|     https://bookkeepe...|                                easy|                  NA|                  null|                             hard|
|   calcite|                  null|                             hard|  https://community...|                             easy|     https://calcite.a...|                                easy|                  NA|                  null|                             hard|
|     camel|                  null|                             hard|  http://camel.apac...|                             easy|     http://camel.apac...|                                easy|                  NA|                  null|                             hard|
|carbondata|                  null|                             hard|  https://carbondat...|                             easy|     https://github.co...|                                easy|           no issues|                  null|                             hard|
| cassandra|  https://whimsy.ap...|                             easy|  https://wiki.apac...|                             easy|     https://wiki.apac...|                                easy|           no issues|                  null|                             hard|
|cloudstack|                  null|                             hard|                  null|                             hard|                     null|                                hard|Perhaps you could...|                  null|                             hard|
|   couchdb|  http://couchdb.ap...|                             easy|                  null|                             hard|     http://couchdb.ap...|                                easy|                  NA|                  null|                             hard|
|       cxf|                  null|                             hard|                  null|                             hard|     http://cxf.apache...|                                easy|                  NA|                  null|                             hard|
|     drill|                  null|                             hard|                  null|                             hard|     https://drill.apa...|                                easy|                  NA|                  null|                             hard|
|     flink|                  null|                             hard|  https://flink.apa...|                             easy|     https://flink.apa...|                                easy|                None|                  null|                             hard|
|     geode|  https://cwiki-tes...|                             easy|  http://incubator....|                             easy|     https://cwiki-tes...|                                easy|           no issues|  https://cwiki.apa...|                             easy|
+----------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+
only showing top 20 rows


In [146]:
def clean_gender_field(column):
    if column is None:
        return "na"
    lowered = column.lower()
    if "female" in lowered or "woman" in lowered or "she" in lowered or "her" in lowered or lowered == "f":
        return "female"
    elif "enby" in lowered or "non-binary" in lowered or "they" in lowered:
        return "enby"
    elif lowered == "m" or "male" in lowered or "https://www.linkedin.com/in/moonsoo-lee-4982a511/" in lowered:
        return "male"
    elif "n/a" in lowered or "na" in lowered:
        return "na"
    else:
        return lowered
clean_gender_field_udf = UserDefinedFunction(clean_gender_field, StringType(), "clean_gender_field")

In [147]:
cleaned_asf_people_human_df = asf_people_human_df.select("*",
                           clean_gender_field_udf("Answer_gender").alias("cleaned_gender"))

In [148]:
cleaned_asf_people_human_df_saved = non_blocking_df_save_or_load(
    cleaned_asf_people_human_df,
    "{0}/human_data_cleaned/asf_people_cleaned".format(fs_prefix))


Reusing

In [149]:
cleaned_asf_people_human_df_saved.count()


Out[149]:
2565

In [150]:
cleaned_sampled_contirbutors_human_df = sampled_contirbutors_human_df.select(
    "*",
    clean_gender_field_udf("Answer_gender").alias("cleaned_gender"))

In [151]:
cleaned_sampled_contirbutors_human_df_saved = non_blocking_df_save_or_load(
    cleaned_sampled_contirbutors_human_df,
    "{0}/human_data_cleaned/sampled_contirbutors_cleaned".format(fs_prefix))


Reusing

In [152]:
def group_by_gender(df):
    return df.groupBy(df.cleaned_gender).agg(F.count(df.cleaned_gender))
def group_by_project_count_gender(df):
    by_gender_and_project = df.withColumn(
        "projects_array",
        F.split(df.Input_projects, " ")).select(
        "*",
        F.explode("projects_array").alias("project")).groupBy(
        "project").agg(
          F.sum((df.cleaned_gender == "male").cast("long")).alias("male"),
          F.sum((df.cleaned_gender == "na").cast("long")).alias("unknown"),
          F.sum((df.cleaned_gender == "enby").cast("long")).alias("enby"),
          F.sum((df.cleaned_gender == "female").cast("long")).alias("female"))
    pre_result = by_gender_and_project.select(
        "*",
        ((by_gender_and_project.enby + by_gender_and_project.female) /
         (by_gender_and_project.male + by_gender_and_project.enby + by_gender_and_project.female)))
    result = pre_result.select(
    F.col("*"), F.col("((enby + female) / ((male + enby) + female))").alias("nonmale_percentage"))
    return result

In [153]:
#group_by_project_count_gender(cleaned_asf_people_human_df).show()

In [154]:
asf_agg_by_gender_df = non_blocking_df_save_or_load_csv(
    group_by_gender(cleaned_asf_people_human_df).repartition(1),
    "{0}/asf_people_cleaned_agg_by_gender_3c".format(fs_prefix))


Reusing

In [155]:
asf_agg_by_gender_df.count()


Out[155]:
4

In [156]:
asf_agg_by_gender_and_proj_df = non_blocking_df_save_or_load_csv(
    group_by_project_count_gender(cleaned_asf_people_human_df_saved).repartition(1),
    "{0}/asf_people_cleaned_agg_by_gender_and_proj_3c".format(fs_prefix))


Reusing

In [157]:
asf_agg_by_gender_and_proj_df.select("*").show()


+-------------+----+-------+----+------+--------------------------------------------+--------------------+
|      project|male|unknown|enby|female|((enby + female) / ((male + enby) + female))|  nonmale_percentage|
+-------------+----+-------+----+------+--------------------------------------------+--------------------+
|         lucy|  11|      0|   0|     1|                         0.08333333333333333| 0.08333333333333333|
|    chemistry|  30|      3|   0|     3|                         0.09090909090909091| 0.09090909090909091|
|      vxquery|   7|      0|   0|     0|                                         0.0|                 0.0|
|       roller|   5|      0|   0|     0|                                         0.0|                 0.0|
|       falcon|  17|      0|   0|     0|                                         0.0|                 0.0|
|        geode|  39|      1|   0|     6|                         0.13333333333333333| 0.13333333333333333|
|trafficserver|  34|      3|   0|     4|                         0.10526315789473684| 0.10526315789473684|
|          tez|  32|      1|   0|     2|                        0.058823529411764705|0.058823529411764705|
|       pdfbox|  19|      1|   0|     1|                                        0.05|                0.05|
|        httpd|  51|      0|   0|     0|                                         0.0|                 0.0|
|   carbondata|   8|      0|   0|     3|                          0.2727272727272727|  0.2727272727272727|
|        celix|   6|      0|   0|     1|                         0.14285714285714285| 0.14285714285714285|
|     accumulo|  32|      1|   0|     1|                        0.030303030303030304|0.030303030303030304|
|       wicket|  29|      1|   0|     0|                                         0.0|                 0.0|
|   servicemix|  21|      1|   0|     1|                        0.045454545454545456|0.045454545454545456|
|        twill|   6|      0|   0|     0|                                         0.0|                 0.0|
|     clerezza|   9|      0|   0|     0|                                         0.0|                 0.0|
|      couchdb|  11|      0|   0|     4|                         0.26666666666666666| 0.26666666666666666|
|       bigtop|  25|      1|   0|     0|                                         0.0|                 0.0|
|     marmotta|  10|      1|   0|     0|                                         0.0|                 0.0|
+-------------+----+-------+----+------+--------------------------------------------+--------------------+
only showing top 20 rows


In [158]:
cleaned_sampled_contirbutors_human_df_saved.schema


Out[158]:
StructType(List(StructField(Input_project_name,StringType,true),StructField(Input_id,StringType,true),StructField(Input_Author,StringType,true),StructField(Input_github_username,StringType,true),StructField(Input_email0,StringType,true),StructField(Input_email1,StringType,true),StructField(Input_email2,StringType,true),StructField(Answer_feedback,StringType,true),StructField(Answer_gender,StringType,true),StructField(Answer_web_urls,StringType,true),StructField(cleaned_gender,StringType,true)))

In [159]:
sampled = cleaned_sampled_contirbutors_human_df_saved \
  .withColumn(
    "Input_projects",
    cleaned_sampled_contirbutors_human_df_saved.Input_project_name)

In [160]:
sampled_contirbutors_human_agg_by_gender_and_proj_df = non_blocking_df_save_or_load_csv(
    group_by_project_count_gender(sampled).repartition(1),
    "{0}/sampled_contirbutors_human_agg_by_gender_and_proj_3c".format(fs_prefix)).alias("sampled")


Reusing

In [161]:
sampled_contirbutors_human_agg_by_gender_and_proj_df.show()


+----------+----+-------+----+------+--------------------------------------------+--------------------+
|   project|male|unknown|enby|female|((enby + female) / ((male + enby) + female))|  nonmale_percentage|
+----------+----+-------+----+------+--------------------------------------------+--------------------+
|  zeppelin|  21|      0|   0|     0|                                         0.0|                 0.0|
|    bigtop|  11|      0|   0|     1|                         0.08333333333333333| 0.08333333333333333|
|     arrow|  41|      1|   0|     1|                        0.023809523809523808|0.023809523809523808|
|  airavata|   9|      0|   0|    14|                          0.6086956521739131|  0.6086956521739131|
|     kafka|  56|      2|   0|     6|                          0.0967741935483871|  0.0967741935483871|
|       cxf|  24|      0|   0|     0|                                         0.0|                 0.0|
|  libcloud|  21|      0|   0|     0|                                         0.0|                 0.0|
|     atlas|  12|      1|   0|     1|                         0.07692307692307693| 0.07692307692307693|
|      hive|  34|      0|   0|     4|                         0.10526315789473684| 0.10526315789473684|
|   couchdb|  19|      0|   0|     3|                         0.13636363636363635| 0.13636363636363635|
|    thrift|  24|      0|   0|     2|                         0.07692307692307693| 0.07692307692307693|
|      kudu|  22|      0|   0|     2|                         0.08333333333333333| 0.08333333333333333|
|bookkeeper|  19|      0|   0|     3|                         0.13636363636363635| 0.13636363636363635|
|     flink|  49|      1|   1|     2|                        0.057692307692307696|0.057692307692307696|
|     drill|  20|      2|   0|     2|                         0.09090909090909091| 0.09090909090909091|
|     mesos|  43|      0|   0|     1|                        0.022727272727272728|0.022727272727272728|
|     karaf|  16|      0|   0|     1|                        0.058823529411764705|0.058823529411764705|
|    wicket|  12|      0|   0|     2|                         0.14285714285714285| 0.14285714285714285|
|  accumulo|  12|      0|   0|     0|                                         0.0|                 0.0|
|     storm|  27|      0|   0|     3|                                         0.1|                 0.1|
+----------+----+-------+----+------+--------------------------------------------+--------------------+
only showing top 20 rows


In [ ]:

Attempt to infer Gender off of name. This has problems, see https://ironholds.org/names-gender/ for a discussion on why this is problematic, but if it matches our statistical samples from above it can augment our understanding of the data. However without doing this it's difficult to get much of a picture (see above where we attempt to gender from other sources, the hit rate leaves something to be desired)


In [162]:
def parse_name_info(input_elem):
    from nameparser import HumanName
    # Kind of a hack but wing seems like a commen name more than a title.
    from nameparser.config import CONSTANTS
    CONSTANTS.titles.remove('hon')
    CONSTANTS.titles.remove('wing')
    if " <" in input_elem:
        name_chunk = input_elem.split(" <")[0]
    elif "<" in input_elem:
        name_chunk = input_elem.split("<")[0]
    else:
        name_chunk = input_elem
    if " " not in name_chunk and "." in name_chunk:
        # Handle the convention[ish] of names of first.last
        name_chunk = name_chunk.replace(".", " ")
    parsed = HumanName(name_chunk)
    return {"title": parsed.title, "first": parsed.first}

In [ ]:


In [163]:
parse_name_info_udf = UserDefinedFunction(
    parse_name_info,
    StructType([StructField("title", StringType()), StructField("first", StringType())]),
    "parse_name_info")

In [164]:
authors_with_name = authors_grouped_by_id_saved.select(
    "*", parse_name_info_udf("Author").alias("parsed_info")).cache()

In [165]:
authors_with_name.select("parsed_info.first", "parsed_info.title").show()


+---------------+-----+
|          first|title|
+---------------+-----+
|          Dylan|     |
|          Mario|     |
|           Chad|     |
|       adeneche|     |
|       Ashutosh|     |
|         Venkat|     |
|        Nicolás|     |
|         Selvin|     |
|        cpovirk|     |
|          Pawel|     |
|          Wenwu|     |
|       Valentin|     |
|  Serhii-Harnyk|     |
|         Thopap|     |
|  juanjovazquez|     |
|        nkukhar|     |
|           Doug|     |
|SRIGOPALMOHANTY|     |
| vincentchenfei|     |
|       Matthias|     |
+---------------+-----+
only showing top 20 rows


In [166]:
names_count = authors_with_name.groupBy("parsed_info.first").agg(F.count("*").alias("names_count"))
names_count.sort(names_count.names_count.desc()).show()


+--------+-----------+
|   first|names_count|
+--------+-----------+
|   David|        154|
| Michael|        123|
|    John|        118|
|  Daniel|        111|
|  Andrew|        106|
|   Chris|         99|
|    Mark|         91|
|    Alex|         81|
|   Peter|         79|
|    Paul|         75|
|  Thomas|         74|
|   Jason|         73|
|  Robert|         73|
|   James|         72|
|    Mike|         66|
|Jonathan|         61|
|     Joe|         57|
|   Brian|         57|
|  Martin|         56|
|    Sean|         56|
+--------+-----------+
only showing top 20 rows


In [167]:
authors_with_name.filter(authors_with_name.parsed_info.title != "").select("parsed_info.first", "parsed_info.title", "Author").count()


Out[167]:
13

In [168]:
@F.pandas_udf(StringType())
def lookup_gender_from_name(names):
    # Uses https://pypi.org/project/gender-guesser/ based on https://autohotkey.com/board/topic/20260-gender-verification-by-forename-cmd-line-tool-db/
    import gender_guesser.detector as gender
    d = gender.Detector()
    def inner_detect_gender(name):
        fname = name.split(" ")[0]
        return d.get_gender(fname)
    return names.apply(inner_detect_gender)

In [169]:
def lookup_gender_from_name_genderize(name):
    from genderize import Genderize
    result = Genderize(api_key=genderize_key).get([name])[0]
    if result['gender'] is not None:
        return result
    else:
        return {"name": name, "gender": None, "probability": None, "count": 0}

In [170]:
lookup_gender_from_name_genderize_udf = UserDefinedFunction(
    lookup_gender_from_name_genderize,
    StructType([StructField("name", StringType()), StructField("gender", StringType()),
               StructField("probability", DoubleType()), StructField("count", IntegerType())
               ]),
    "lookup_gender_from_name_genderize")

In [171]:
# Cache to break pipeline and mix UDF types
infered_gender_for_authors = authors_with_name.withColumn(
    "infered_gender",
    lookup_gender_from_name("parsed_info.first")) \
    .cache() \
    .withColumn(
    "genderize_results",
    lookup_gender_from_name_genderize_udf("parsed_info.first")).cache()

In [172]:
infered_gender_for_authors_pq_saved = non_blocking_df_save_or_load(
    infered_gender_for_authors,
    "{0}/infered_gender_for_authors_pq_3".format(fs_prefix))


Reusing

In [173]:
infered_gender_for_authors_pq_saved.take(5)


Out[173]:
[Row(project_name='accumulo', new_unique_id='<dhutchis@mit.edu>', emails=['<dhutchis@mit.edu>'], Author='Dylan Hutchison <dhutchis@mit.edu>', github_username='', latest_commit=datetime.date(2015, 1, 2), parsed_info=Row(title='', first='Dylan'), infered_gender='mostly_male', genderize_results=Row(name='Dylan', gender='male', probability=0.99, count=785)),
 Row(project_name='accumulo', new_unique_id='<mario.pastorelli@teralytics.ch>', emails=['<mario.pastorelli@teralytics.ch>'], Author='Mario Pastorelli <mario.pastorelli@teralytics.ch>', github_username='', latest_commit=datetime.date(2015, 12, 30), parsed_info=Row(title='', first='Mario'), infered_gender='male', genderize_results=Row(name='Mario', gender='male', probability=0.99, count=2026)),
 Row(project_name='activemq', new_unique_id='<czobrisky@gmail.com>', emails=['<czobrisky@gmail.com>'], Author='Chad Zobrisky <czobrisky@gmail.com>', github_username='', latest_commit=datetime.date(2014, 12, 31), parsed_info=Row(title='', first='Chad'), infered_gender='male', genderize_results=Row(name='Chad', gender='male', probability=1.0, count=1028)),
 Row(project_name='arrow', new_unique_id='<adeneche@dremio.com>', emails=['<adeneche@dremio.com>'], Author='adeneche <adeneche@dremio.com>', github_username='', latest_commit=datetime.date(2017, 1, 6), parsed_info=Row(title='', first='adeneche'), infered_gender='unknown', genderize_results=Row(name='adeneche', gender=None, probability=None, count=0)),
 Row(project_name='atlas', new_unique_id='<amestry@apache.org>', emails=['<amestry@apache.org>'], Author='Ashutosh Mestry <amestry@apache.org>', github_username='', latest_commit=datetime.date(2017, 1, 6), parsed_info=Row(title='', first='Ashutosh'), infered_gender='male', genderize_results=Row(name='Ashutosh', gender='male', probability=1.0, count=129))]

In [174]:
infered_relevant_info = infered_gender_for_authors_pq_saved.select(
    infered_gender_for_authors_pq_saved.project_name,
    infered_gender_for_authors_pq_saved.Author,
    infered_gender_for_authors_pq_saved.new_unique_id,
    infered_gender_for_authors_pq_saved.latest_commit,
    infered_gender_for_authors_pq_saved.parsed_info.title.alias("title"),
    infered_gender_for_authors_pq_saved.infered_gender,
    infered_gender_for_authors_pq_saved.genderize_results.gender.alias("genderize_gender"),
    infered_gender_for_authors_pq_saved.genderize_results.probability.alias("genderize_prob"))

In [175]:
infered_relevant_info.show()


+------------+--------------------+--------------------+-------------+-----+--------------+----------------+--------------+
|project_name|              Author|       new_unique_id|latest_commit|title|infered_gender|genderize_gender|genderize_prob|
+------------+--------------------+--------------------+-------------+-----+--------------+----------------+--------------+
|    accumulo|Dylan Hutchison <...|  <dhutchis@mit.edu>|   2015-01-02|     |   mostly_male|            male|          0.99|
|    accumulo|Mario Pastorelli ...|<mario.pastorelli...|   2015-12-30|     |          male|            male|          0.99|
|    activemq|Chad Zobrisky <cz...|<czobrisky@gmail....|   2014-12-31|     |          male|            male|           1.0|
|       arrow|adeneche <adenech...|<adeneche@dremio....|   2017-01-06|     |       unknown|            null|          null|
|       atlas|Ashutosh Mestry <...|<amestry@apache.org>|   2017-01-06|     |          male|            male|           1.0|
|       atlas|Venkat Ranganatha...|<venkat@hortonwor...|   2015-01-02|     |          male|            male|           1.0|
|      aurora|Nicolás Donatucci...|<ndonatucci@medal...|   2017-01-03|     |          male|            null|          null|
|      aurora|Selvin George <sg...|<sgeorge@twitter....|   2013-01-05|     |       unknown|            male|          0.96|
|        beam|cpovirk <cpovirk@...|<cpovirk@google.com>|   2016-01-01|     |       unknown|            null|          null|
|        beam|Pawel Kaczmarczyk...|<p.kaczmarczyk@oc...|   2018-01-02|     |          male|            male|          0.98|
|      bigtop|Wenwu Peng <pengw...|<pengwenwu2008@16...|   2014-01-02|     |       unknown|            null|          null|
|    brooklyn|Valentin Aitken <...|  <bostko@gmail.com>|   2014-12-30|     |          male|            male|           1.0|
|     calcite|Serhii-Harnyk <se...|<serhii.harnyk@gm...|   2015-12-29|     |       unknown|            null|          null|
|       camel|Thopap <Thomas.pa...|<Thomas.papke@icw...|   2017-01-03|     |       unknown|            null|          null|
|       camel|juanjovazquez <jv...|<jvazquez@tecsisa...|   2014-01-01|     |       unknown|            null|          null|
|       camel|nkukhar <kukhar.n...|             nkukhar|   2015-01-02|     |       unknown|            null|          null|
|       camel|Doug Turnbull <dt...|        softwaredoug|   2014-01-04|     |          male|            male|           1.0|
|  carbondata|SRIGOPALMOHANTY <...|<SRIGOPALMOHANTY@...|   2017-01-03|     |       unknown|            null|          null|
|  carbondata|vincentchenfei <v...|<vincent.chenfei@...|   2017-01-03|     |       unknown|            null|          null|
|   cassandra|Matthias Weiser <...|<MWeiser@ardmoref...|   2018-01-02|     |          male|            male|           1.0|
+------------+--------------------+--------------------+-------------+-----+--------------+----------------+--------------+
only showing top 20 rows


In [177]:
infered_relevant_info.groupBy(infered_relevant_info.genderize_gender).agg(F.count("*")).show()


+----------------+--------+
|genderize_gender|count(1)|
+----------------+--------+
|            null|    3892|
|          female|    1040|
|            male|   10508|
+----------------+--------+


In [178]:
infered_relevant_info.groupBy(infered_relevant_info.infered_gender).agg(F.count("*")).show()


+--------------+--------+
|infered_gender|count(1)|
+--------------+--------+
| mostly_female|      90|
|       unknown|    5759|
|        female|     434|
|          andy|     396|
|          male|    8249|
|   mostly_male|     512|
+--------------+--------+


In [179]:
relevant_info = infered_relevant_info.withColumn(
    "Input_projects",
    infered_relevant_info.project_name).withColumn(
    "cleaned_gender",
    clean_gender_field_udf("genderize_gender"))

In [180]:
relevant_info_agg_by_gender_and_proj_df = non_blocking_df_save_or_load_csv(
    group_by_project_count_gender(relevant_info).repartition(1),
    "{0}/relevant_info_agg_by_gender_and_proj_3c".format(fs_prefix))


Reusing

In [181]:
relevant_info_agg_by_gender_and_proj_df.alias("infered").show()


+----------+----+-------+----+------+--------------------------------------------+--------------------+
|   project|male|unknown|enby|female|((enby + female) / ((male + enby) + female))|  nonmale_percentage|
+----------+----+-------+----+------+--------------------------------------------+--------------------+
|      reef|  42|     31|   0|     8|                                        0.16|                0.16|
|   myfaces|  39|      3|   0|     1|                                       0.025|               0.025|
|    groovy| 210|     55|   0|     5|                        0.023255813953488372|0.023255813953488372|
|   phoenix|  66|     24|   0|     1|                        0.014925373134328358|0.014925373134328358|
| trafodion|  71|     49|   0|    36|                          0.3364485981308411|  0.3364485981308411|
| asterixdb|  46|     33|   0|     1|                         0.02127659574468085| 0.02127659574468085|
|     pivot|  10|      2|   0|     1|                         0.09090909090909091| 0.09090909090909091|
|freemarker|   9|     10|   0|     0|                                         0.0|                 0.0|
|      fluo|  19|      4|   0|     2|                         0.09523809523809523| 0.09523809523809523|
|  notebook| 245|     73|   0|    23|                         0.08582089552238806| 0.08582089552238806|
|      hive| 150|     53|   0|    18|                         0.10714285714285714| 0.10714285714285714|
| chemistry|  12|      0|   0|     0|                                         0.0|                 0.0|
|   vxquery|   7|      7|   0|     2|                          0.2222222222222222|  0.2222222222222222|
|     hbase| 194|    100|   0|    24|                         0.11009174311926606| 0.11009174311926606|
|       ant|  46|     11|   0|     4|                                        0.08|                0.08|
|    falcon|  32|     14|   0|     9|                         0.21951219512195122| 0.21951219512195122|
|     geode|  91|     33|   0|    12|                         0.11650485436893204| 0.11650485436893204|
|    crunch|  52|      7|   0|     0|                                         0.0|                 0.0|
|subversion|  69|     13|   0|     1|                        0.014285714285714285|0.014285714285714285|
|      drat|  16|     12|   0|     1|                        0.058823529411764705|0.058823529411764705|
+----------+----+-------+----+------+--------------------------------------------+--------------------+
only showing top 20 rows

Lets see what's correlated - TODO loop this over the different types of data.


In [244]:
joined_sampled_and_infered = sampled_contirbutors_human_agg_by_gender_and_proj_df.join(
    relevant_info_agg_by_gender_and_proj_df,
    on="project")

In [245]:
joined_with_project_info = sampled_contirbutors_human_agg_by_gender_and_proj_df.join(
    project_human_cleaned_df,
    on="project").join(
    agg_post_sentiment,
    on="project")

In [246]:
joined_with_project_info.show()


+----------+----+-------+----+------+--------------------------------------------+-------------------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+------------+-----------------+--------------------+--------------------+-----------------+-------------------+--------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|   project|male|unknown|enby|female|((enby + female) / ((male + enby) + female))| nonmale_percentage|Answer_code_of_conduct|Answer_code_of_conduct_difficulty|Answer_committer_guide|Answer_committer_guide_difficulty|Answer_contributing_guide|Answer_contributing_guide_difficulty|     Answer_feedback|Answer_mentoring_guide|Answer_mentoring_guide_difficulty|project_name|sentiment.neg_max|   sentiment.neg_avg|       neg_quantiles|sentiment.pos_max|  sentiment.pos_avg|       pos_quantiles|sentiment.neg_25quantile|sentiment.neg_50quantile|sentiment.neg_75quantile|sentiment.pos_25quantile|sentiment.pos_50quantile|sentiment.pos_75quantile|
+----------+----+-------+----+------+--------------------------------------------+-------------------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+------------+-----------------+--------------------+--------------------+-----------------+-------------------+--------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|cloudstack|  29|      1|   0|     2|                         0.06451612903225806|0.06451612903225806|                  null|                             hard|                  null|                             hard|                     null|                                hard|Perhaps you could...|                  null|                             hard|  cloudstack|            0.374|0.030660410167236668|[0.011, 0.027, 0.04]|            0.416|0.07643164731754547|[0.011, 0.027, 0.04]|                   0.011|                   0.027|                    0.04|                   0.011|                   0.027|                    0.04|
|     hbase|  54|      2|   0|     1|                         0.01818181818181818|0.01818181818181818|  https://hbase.apa...|                             easy|  https://hbase.apa...|                             easy|     https://hbase.apa...|                                easy|           no issues|  https://hbase.apa...|                             easy|       hbase|            0.336|0.047520055452864765| [0.0, 0.029, 0.058]|            0.419| 0.0648682994454709| [0.0, 0.029, 0.058]|                     0.0|                   0.029|                   0.058|                     0.0|                   0.029|                   0.058|
|      hive|  34|      0|   0|     4|                         0.10526315789473684|0.10526315789473684|                  null|                             hard|  https://cwiki.apa...|                             easy|     https://cwiki.apa...|                                easy|                  NA|                  null|                             hard|        hive|            0.299|0.031159021113243708| [0.0, 0.022, 0.048]|            0.744| 0.0713851247600767| [0.0, 0.022, 0.048]|                     0.0|                   0.022|                   0.048|                     0.0|                   0.022|                   0.048|
|   phoenix|  22|      0|   0|     0|                                         0.0|                0.0|                  null|                             hard|                  null|                             hard|     https://phoenix.a...|                                easy|                  NA|                  null|                             hard|     phoenix|            0.278|  0.0286186250280833| [0.0, 0.019, 0.278]|            0.604|0.06938373399236117| [0.0, 0.019, 0.278]|                     0.0|                   0.019|                   0.278|                     0.0|                   0.019|                   0.278|
|     spark|  76|      1|   0|     5|                         0.06172839506172839|0.06172839506172839|                  null|                             hard|  https://spark.apa...|                             easy|     https://spark.apa...|                                easy|                  NA|                  null|                             hard|       spark|            0.297|0.029928716020821203|[0.006, 0.023, 0....|            0.503|0.08106636784268348|[0.006, 0.023, 0....|                   0.006|                   0.023|                   0.297|                   0.006|                   0.023|                   0.297|
+----------+----+-------+----+------+--------------------------------------------+-------------------+----------------------+---------------------------------+----------------------+---------------------------------+-------------------------+------------------------------------+--------------------+----------------------+---------------------------------+------------+-----------------+--------------------+--------------------+-----------------+-------------------+--------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+


In [247]:
from itertools import chain

def flatmap(f, items):
        return chain.from_iterable(map(f, items))

def compute_new_columns(df):
    columns = df.columns
    def compute_numeric_columns(column):
        if "Answer_" in column and not "feedback" in column and not "_difficulty" in column:
            return [
                (~ F.isnull(F.col("`{0}`".format(column)))).cast("long").alias(column+"_exists"),
                (~ F.isnull(F.col("`{0}`".format(column))) & (F.col(column + "_difficulty") == "easy")).cast("long").alias(column+"_easy")]
        return []
    my_columns = list(flatmap(compute_numeric_columns, columns))
    my_columns.append("*")
    return df.select(*my_columns)

In [248]:
numeric_df = compute_new_columns(joined_with_project_info)

In [249]:
numeric_df.select("`nonmale_percentage`")


Out[249]:
DataFrame[nonmale_percentage: double]

In [250]:
manual_aggs = [
    F.corr("`nonmale_percentage`", "Answer_mentoring_guide_exists"),
    F.corr("`nonmale_percentage`", "Answer_mentoring_guide_easy"),
    F.corr("`nonmale_percentage`", "Answer_contributing_guide_exists"),
    F.corr("`nonmale_percentage`", "Answer_contributing_guide_easy"),
    F.corr("`nonmale_percentage`", "Answer_committer_guide_exists"),
    F.corr("`nonmale_percentage`", "Answer_committer_guide_easy"),
    F.corr("`nonmale_percentage`", "Answer_code_of_conduct_exists"),
    F.corr("`nonmale_percentage`", "Answer_code_of_conduct_easy")]

In [251]:
computed_aggs = map(
    lambda c: F.corr("`nonmale_percentage`", "`{0}`".format(c)),
    filter(lambda c: "sentiment.pos" in c or "sentiment.neg" in c, numeric_df.columns))

In [252]:
corr_aggs = []
corr_aggs.extend(manual_aggs)
corr_aggs.extend(computed_aggs)

In [253]:
numeric_df.agg(*corr_aggs).show()


+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------+-----------------------------------------------------+-------------------------------------------------------+-----------------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+
|corr(nonmale_percentage, Answer_mentoring_guide_exists)|corr(nonmale_percentage, Answer_mentoring_guide_easy)|corr(nonmale_percentage, Answer_contributing_guide_exists)|corr(nonmale_percentage, Answer_contributing_guide_easy)|corr(nonmale_percentage, Answer_committer_guide_exists)|corr(nonmale_percentage, Answer_committer_guide_easy)|corr(nonmale_percentage, Answer_code_of_conduct_exists)|corr(nonmale_percentage, Answer_code_of_conduct_easy)|corr(nonmale_percentage, `sentiment.neg_max`)|corr(nonmale_percentage, `sentiment.neg_avg`)|corr(nonmale_percentage, `sentiment.pos_max`)|corr(nonmale_percentage, `sentiment.pos_avg`)|corr(nonmale_percentage, `sentiment.neg_25quantile`)|corr(nonmale_percentage, `sentiment.neg_50quantile`)|corr(nonmale_percentage, `sentiment.neg_75quantile`)|corr(nonmale_percentage, `sentiment.pos_25quantile`)|corr(nonmale_percentage, `sentiment.pos_50quantile`)|corr(nonmale_percentage, `sentiment.pos_75quantile`)|
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------+-----------------------------------------------------+-------------------------------------------------------+-----------------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+
|                                    -0.4269689320917422|                                  -0.4269689320917422|                                       -0.1960081503782872|                                     -0.1960081503782872|                                    0.38817892141801813|                                  0.38817892141801813|                                    -0.4269689320917422|                                  -0.4269689320917422|                          0.14828713581114342|                          -0.3173698896837277|                           0.4197358881824746|                          0.47326922463905635|                                  0.2790390742164674|                                0.018343520672052468|                                   -0.41192296901851|                                  0.2790390742164674|                                0.018343520672052468|                                   -0.41192296901851|
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------+-----------------------------------------------------+-------------------------------------------------------+-----------------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+


In [254]:
local_cor = numeric_df.agg(*corr_aggs).collect()

In [255]:
local_cor


Out[255]:
[Row(corr(nonmale_percentage, Answer_mentoring_guide_exists)=-0.4269689320917422, corr(nonmale_percentage, Answer_mentoring_guide_easy)=-0.4269689320917422, corr(nonmale_percentage, Answer_contributing_guide_exists)=-0.1960081503782872, corr(nonmale_percentage, Answer_contributing_guide_easy)=-0.1960081503782872, corr(nonmale_percentage, Answer_committer_guide_exists)=0.38817892141801813, corr(nonmale_percentage, Answer_committer_guide_easy)=0.38817892141801813, corr(nonmale_percentage, Answer_code_of_conduct_exists)=-0.4269689320917422, corr(nonmale_percentage, Answer_code_of_conduct_easy)=-0.4269689320917422, corr(nonmale_percentage, `sentiment.neg_max`)=0.14828713581114342, corr(nonmale_percentage, `sentiment.neg_avg`)=-0.3173698896837277, corr(nonmale_percentage, `sentiment.pos_max`)=0.4197358881824746, corr(nonmale_percentage, `sentiment.pos_avg`)=0.47326922463905635, corr(nonmale_percentage, `sentiment.neg_25quantile`)=0.2790390742164674, corr(nonmale_percentage, `sentiment.neg_50quantile`)=0.018343520672052468, corr(nonmale_percentage, `sentiment.neg_75quantile`)=-0.41192296901851, corr(nonmale_percentage, `sentiment.pos_25quantile`)=0.2790390742164674, corr(nonmale_percentage, `sentiment.pos_50quantile`)=0.018343520672052468, corr(nonmale_percentage, `sentiment.pos_75quantile`)=-0.41192296901851)]

In [256]:
numeric_df.agg(*corr_aggs).toPandas()


Out[256]:
corr(nonmale_percentage, Answer_mentoring_guide_exists) corr(nonmale_percentage, Answer_mentoring_guide_easy) corr(nonmale_percentage, Answer_contributing_guide_exists) corr(nonmale_percentage, Answer_contributing_guide_easy) corr(nonmale_percentage, Answer_committer_guide_exists) corr(nonmale_percentage, Answer_committer_guide_easy) corr(nonmale_percentage, Answer_code_of_conduct_exists) corr(nonmale_percentage, Answer_code_of_conduct_easy) corr(nonmale_percentage, `sentiment.neg_max`) corr(nonmale_percentage, `sentiment.neg_avg`) corr(nonmale_percentage, `sentiment.pos_max`) corr(nonmale_percentage, `sentiment.pos_avg`) corr(nonmale_percentage, `sentiment.neg_25quantile`) corr(nonmale_percentage, `sentiment.neg_50quantile`) corr(nonmale_percentage, `sentiment.neg_75quantile`) corr(nonmale_percentage, `sentiment.pos_25quantile`) corr(nonmale_percentage, `sentiment.pos_50quantile`) corr(nonmale_percentage, `sentiment.pos_75quantile`)
0 -0.426969 -0.426969 -0.196008 -0.196008 0.388179 0.388179 -0.426969 -0.426969 0.148287 -0.31737 0.419736 0.473269 0.279039 0.018344 -0.411923 0.279039 0.018344 -0.411923

In [ ]: